load users from a file
This commit is contained in:
parent
9aef7121fc
commit
eb2b5c876c
|
@ -27,5 +27,7 @@ pub enum Command {
|
|||
file: PathBuf,
|
||||
#[clap(short, long)]
|
||||
channel_id: String,
|
||||
#[clap(short, long)]
|
||||
users_file: PathBuf,
|
||||
},
|
||||
}
|
||||
|
|
|
@ -76,9 +76,11 @@ async fn main() -> anyhow::Result<()> {
|
|||
channel_id,
|
||||
jobs,
|
||||
}) => migrate(db, source_dir, channel_id, jobs).await,
|
||||
Some(Command::MigrateSupibot { file, channel_id }) => {
|
||||
migrator::supibot::run(db, &file, &channel_id).await
|
||||
}
|
||||
Some(Command::MigrateSupibot {
|
||||
file,
|
||||
channel_id,
|
||||
users_file,
|
||||
}) => migrator::supibot::run(db, &file, &channel_id, &users_file).await,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
mod users;
|
||||
|
||||
use self::users::IvrUser;
|
||||
|
||||
use super::INSERT_BATCH_SIZE;
|
||||
use crate::{
|
||||
db::schema::{Message, MESSAGES_TABLE},
|
||||
|
@ -11,14 +9,26 @@ use anyhow::Context;
|
|||
use chrono::NaiveDateTime;
|
||||
use clickhouse::inserter::Inserter;
|
||||
use serde::Deserialize;
|
||||
use std::{borrow::Cow, collections::HashMap, fs::File, io::BufReader, path::Path, time::Duration};
|
||||
use tracing::info;
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
collections::{HashMap, HashSet},
|
||||
fs::File,
|
||||
io::BufReader,
|
||||
path::Path,
|
||||
time::Duration,
|
||||
};
|
||||
use tracing::{error, info};
|
||||
|
||||
// Exmaple: 2023-06-23 14:46:26.588
|
||||
const DATE_FMT: &str = "%F %X%.3f";
|
||||
const USERS_REQUEST_CHUNK_SIZE: usize = 50;
|
||||
|
||||
pub async fn run(db: clickhouse::Client, file_path: &Path, channel_id: &str) -> anyhow::Result<()> {
|
||||
pub async fn run(
|
||||
db: clickhouse::Client,
|
||||
file_path: &Path,
|
||||
channel_id: &str,
|
||||
users_file_path: &Path,
|
||||
) -> anyhow::Result<()> {
|
||||
info!("Loading file {file_path:?}");
|
||||
|
||||
let inserter = db
|
||||
|
@ -31,13 +41,19 @@ pub async fn run(db: clickhouse::Client, file_path: &Path, channel_id: &str) ->
|
|||
.with_period(Some(Duration::from_secs(15)));
|
||||
|
||||
let mut users_client = UsersClient::default();
|
||||
let channel_user = users_client.get_user(channel_id).await?;
|
||||
users_client
|
||||
.add_from_file(users_file_path)
|
||||
.context("Could not read the users file")?;
|
||||
|
||||
let channel_user = users_client.get_user_login(channel_id).await?;
|
||||
|
||||
let migrator = SupibotMigrator {
|
||||
users_client,
|
||||
non_cached_messages: HashMap::with_capacity(USERS_REQUEST_CHUNK_SIZE),
|
||||
inserter,
|
||||
channel_user,
|
||||
channel_id: channel_id.to_owned(),
|
||||
channel_login: channel_user,
|
||||
invalid_user_ids: HashSet::new(),
|
||||
};
|
||||
|
||||
migrator.migrate_channel(file_path).await
|
||||
|
@ -49,7 +65,9 @@ struct SupibotMigrator {
|
|||
/// Indexed by user id
|
||||
non_cached_messages: HashMap<String, Vec<SupibotMessage>>,
|
||||
inserter: Inserter<Message<'static>>,
|
||||
channel_user: IvrUser,
|
||||
channel_id: String,
|
||||
channel_login: String,
|
||||
invalid_user_ids: HashSet<String>,
|
||||
}
|
||||
|
||||
impl SupibotMigrator {
|
||||
|
@ -65,14 +83,16 @@ impl SupibotMigrator {
|
|||
let supibot_message = result?;
|
||||
|
||||
if supibot_message.historic == 0 {
|
||||
if let Some(user) = self
|
||||
if let Some(user_login) = self
|
||||
.users_client
|
||||
.get_cached_user(&supibot_message.platform_id)
|
||||
.get_cached_user_login(&supibot_message.platform_id)
|
||||
{
|
||||
write_message(
|
||||
supibot_message,
|
||||
user,
|
||||
&self.channel_user,
|
||||
&supibot_message,
|
||||
user_login,
|
||||
&supibot_message.platform_id,
|
||||
&self.channel_login,
|
||||
&self.channel_id,
|
||||
&mut self.inserter,
|
||||
)
|
||||
.await?;
|
||||
|
@ -87,22 +107,19 @@ impl SupibotMigrator {
|
|||
}
|
||||
}
|
||||
} else {
|
||||
let user = self
|
||||
let user_id = self
|
||||
.users_client
|
||||
.get_user_by_name(&supibot_message.platform_id)
|
||||
.get_user_id_by_name(&supibot_message.platform_id)
|
||||
.await?
|
||||
// Used when the user id cannot be retrieved
|
||||
.unwrap_or_else(|| IvrUser {
|
||||
id: String::new(),
|
||||
display_name: supibot_message.platform_id.clone(),
|
||||
login: supibot_message.platform_id.clone(),
|
||||
chat_color: None,
|
||||
});
|
||||
.unwrap_or_default();
|
||||
|
||||
write_message(
|
||||
supibot_message,
|
||||
&user,
|
||||
&self.channel_user,
|
||||
&supibot_message,
|
||||
&supibot_message.platform_id,
|
||||
&user_id,
|
||||
&self.channel_login,
|
||||
&self.channel_id,
|
||||
&mut self.inserter,
|
||||
)
|
||||
.await?;
|
||||
|
@ -134,6 +151,10 @@ impl SupibotMigrator {
|
|||
);
|
||||
}
|
||||
|
||||
if !self.invalid_user_ids.is_empty() {
|
||||
error!("Invalid user ids: {:?}", self.invalid_user_ids);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -142,12 +163,25 @@ impl SupibotMigrator {
|
|||
let users = self.users_client.get_users(&user_ids).await?;
|
||||
|
||||
for (user_id, messages) in self.non_cached_messages.drain() {
|
||||
let user = users
|
||||
.get(&user_id)
|
||||
.with_context(|| format!("User {user_id} is not in the users response",))?;
|
||||
|
||||
for message in messages {
|
||||
write_message(message, user, &self.channel_user, &mut self.inserter).await?;
|
||||
match users.get(&user_id) {
|
||||
Some(user_login) => {
|
||||
for message in messages {
|
||||
write_message(
|
||||
&message,
|
||||
user_login,
|
||||
&user_id,
|
||||
&self.channel_login,
|
||||
&self.channel_id,
|
||||
&mut self.inserter,
|
||||
)
|
||||
.await?;
|
||||
// write_message(message, user, &self.channel_user, &mut self.inserter)
|
||||
// .await?;
|
||||
}
|
||||
}
|
||||
None => {
|
||||
self.invalid_user_ids.insert(user_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -156,9 +190,11 @@ impl SupibotMigrator {
|
|||
}
|
||||
|
||||
async fn write_message(
|
||||
supibot_message: SupibotMessage,
|
||||
user: &IvrUser,
|
||||
channel_user: &IvrUser,
|
||||
supibot_message: &SupibotMessage,
|
||||
user_login: &str,
|
||||
user_id: &str,
|
||||
channel_login: &str,
|
||||
channel_id: &str,
|
||||
inserter: &mut Inserter<Message<'_>>,
|
||||
) -> anyhow::Result<()> {
|
||||
let text = &supibot_message.text;
|
||||
|
@ -169,17 +205,15 @@ async fn write_message(
|
|||
|
||||
let raw = format!(
|
||||
"@id=;returning-chatter=0;turbo=0;mod=0;room-id={channel_id};subscriber=;tmi-sent-ts={timestamp};badge-info=;user-id={user_id};badges=;user-type=;display-name={display_name};flags=;emotes=;first-msg=0;color={color} :{login}!{login}@{login}.tmi.twitch.tv PRIVMSG #{channel_login} :{text}",
|
||||
channel_id = &channel_user.id,
|
||||
channel_login = &channel_user.login,
|
||||
display_name = &user.display_name,
|
||||
user_id = &user.id,
|
||||
login = &user.login,
|
||||
color = user.chat_color.as_deref().unwrap_or_default(),
|
||||
display_name = user_login,
|
||||
user_id = user_id,
|
||||
login = user_login,
|
||||
color = "",
|
||||
);
|
||||
|
||||
let message = Message {
|
||||
channel_id: Cow::Owned(channel_user.id.clone()),
|
||||
user_id: Cow::Owned(user.id.clone()),
|
||||
channel_id: Cow::Owned(channel_id.to_owned()),
|
||||
user_id: Cow::Owned(user_id.to_owned()),
|
||||
timestamp,
|
||||
raw: Cow::Owned(raw),
|
||||
};
|
||||
|
|
|
@ -1,28 +1,54 @@
|
|||
use anyhow::{anyhow, Context};
|
||||
use serde::Deserialize;
|
||||
use std::collections::HashMap;
|
||||
use tracing::{debug, warn};
|
||||
use std::{collections::HashMap, fs::File, io::BufReader, path::Path};
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct UsersClient {
|
||||
client: reqwest::Client,
|
||||
users: HashMap<String, IvrUser>,
|
||||
users: HashMap<String, String>,
|
||||
// Names mapped to ids
|
||||
names: HashMap<String, Option<String>>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct FileUser {
|
||||
#[serde(rename = "Name")]
|
||||
name: String,
|
||||
#[serde(rename = "ID")]
|
||||
id: String,
|
||||
}
|
||||
|
||||
impl UsersClient {
|
||||
pub fn add_from_file(&mut self, file_path: &Path) -> anyhow::Result<()> {
|
||||
info!("Loading users from {file_path:?}");
|
||||
|
||||
let file = File::open(file_path)?;
|
||||
let reader = BufReader::new(file);
|
||||
let rdr = csv::Reader::from_reader(reader);
|
||||
|
||||
for user in rdr.into_deserialize::<FileUser>() {
|
||||
let user = user?;
|
||||
self.users.insert(user.id.clone(), user.name.clone());
|
||||
self.names.insert(user.name, Some(user.id));
|
||||
}
|
||||
|
||||
info!("{} users loaded", self.users.len());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_users(
|
||||
&mut self,
|
||||
ids: &[impl AsRef<str>],
|
||||
) -> anyhow::Result<HashMap<String, IvrUser>> {
|
||||
) -> anyhow::Result<HashMap<String, String>> {
|
||||
let mut ids_to_request = Vec::with_capacity(ids.len());
|
||||
let mut response_users = HashMap::with_capacity(ids.len());
|
||||
|
||||
for id in ids {
|
||||
match self.users.get(id.as_ref()) {
|
||||
Some(user) => {
|
||||
response_users.insert(user.id.clone(), user.clone());
|
||||
Some(name) => {
|
||||
response_users.insert(id.as_ref().to_owned(), name.clone());
|
||||
}
|
||||
None => {
|
||||
ids_to_request.push(id.as_ref());
|
||||
|
@ -60,22 +86,20 @@ impl UsersClient {
|
|||
for result in results {
|
||||
let api_response = result?;
|
||||
for user in api_response {
|
||||
self.users.insert(user.id.clone(), user.clone());
|
||||
response_users.insert(user.id.clone(), user);
|
||||
self.users.insert(user.id.clone(), user.login.clone());
|
||||
response_users.insert(user.id.clone(), user.login);
|
||||
}
|
||||
}
|
||||
|
||||
if !ids_to_request.is_empty() {}
|
||||
|
||||
Ok(response_users)
|
||||
}
|
||||
|
||||
pub async fn get_user(&mut self, id: &str) -> anyhow::Result<IvrUser> {
|
||||
pub async fn get_user_login(&mut self, id: &str) -> anyhow::Result<String> {
|
||||
let users = self.get_users(&[id]).await?;
|
||||
users.into_values().next().context("Empty ivr response")
|
||||
}
|
||||
|
||||
pub async fn get_user_by_name(&mut self, name: &str) -> anyhow::Result<Option<IvrUser>> {
|
||||
pub async fn get_user_id_by_name(&mut self, name: &str) -> anyhow::Result<Option<String>> {
|
||||
match self.names.get(name) {
|
||||
Some(id) => Ok(id.as_ref().map(|id| self.users.get(id).cloned().unwrap())),
|
||||
None => {
|
||||
|
@ -103,8 +127,8 @@ impl UsersClient {
|
|||
match users.into_iter().next() {
|
||||
Some(user) => {
|
||||
self.names.insert(user.login.clone(), Some(user.id.clone()));
|
||||
self.users.insert(user.id.clone(), user.clone());
|
||||
Ok(Some(user))
|
||||
self.users.insert(user.id.clone(), user.login.clone());
|
||||
Ok(Some(user.login))
|
||||
}
|
||||
None => {
|
||||
warn!("User {name} cannot be retrieved");
|
||||
|
@ -116,8 +140,8 @@ impl UsersClient {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn get_cached_user(&self, id: &str) -> Option<&IvrUser> {
|
||||
self.users.get(id)
|
||||
pub fn get_cached_user_login(&self, id: &str) -> Option<&str> {
|
||||
self.users.get(id).map(|s| s.as_str())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue