Compare commits

..

5 commits

Author SHA1 Message Date
sweep-ai[bot]
05b1537b61
Merge main into sweep/add-sweep-config 2023-08-11 00:03:55 +00:00
sweep-ai[bot]
7ea09b5803
Create refactor template 2023-08-10 23:22:31 +00:00
sweep-ai[bot]
0547b4537b
Create feature template 2023-08-10 23:22:31 +00:00
sweep-ai[bot]
3fe711b1c7
Create bugfix template 2023-08-10 23:22:30 +00:00
sweep-ai[bot]
de46f099e9
Create sweep.yaml config file 2023-08-10 23:22:30 +00:00
11 changed files with 1364 additions and 1925 deletions

11
.github/ISSUE_TEMPLATE/sweep-bugfix.yml vendored Normal file
View file

@ -0,0 +1,11 @@
name: Bugfix
title: 'Sweep: '
description: Write something like "We notice ... behavior when ... happens instead of ...""
labels: sweep
body:
- type: textarea
id: description
attributes:
label: Details
description: More details about the bug
placeholder: The bug might be in ... file

View file

@ -0,0 +1,11 @@
name: Feature Request
title: 'Sweep: '
description: Write something like "Write an api endpoint that does "..." in the "..." file"
labels: sweep
body:
- type: textarea
id: description
attributes:
label: Details
description: More details for Sweep
placeholder: The new endpoint should use the ... class from ... file because it contains ... logic

View file

@ -0,0 +1,11 @@
name: Refactor
title: 'Sweep: '
description: Write something like "Modify the ... api endpoint to use ... version and ... framework"
labels: sweep
body:
- type: textarea
id: description
attributes:
label: Details
description: More details for Sweep
placeholder: We are migrating this function to ... version because ...

View file

@ -14,16 +14,9 @@ jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v3
- uses: Swatinem/rust-cache@v2
- uses: rui314/setup-mold@v1
- name: Set up NASM
uses: ilammy/setup-nasm@v1.5.1
uses: ilammy/setup-nasm@v1.4.0
- name: Build
run: RUSTFLAGS='-C target-feature=+crt-static' cargo build --release --target x86_64-unknown-linux-gnu
- run: mv target/x86_64-unknown-linux-gnu/release/piped-proxy piped-proxy
- name: Upload artifact
uses: actions/upload-artifact@v4
with:
name: piped-proxy
path: piped-proxy
run: cargo build --release

View file

@ -4,7 +4,7 @@ jobs:
clippy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v3
- name: Install clippy
uses: dtolnay/rust-toolchain@stable
with:

2430
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -6,43 +6,22 @@ version = "0.1.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
# Web Requests & Async Runtime
tokio = { version = "1.37.0", features = ["full"] }
actix-web = "4.5.1"
reqwest = { version = "0.12.9", features = ["stream", "brotli", "gzip", "socks"], default-features = false }
actix-web = "4.3.1"
image = "0.24.7"
libwebp-sys = { version = "0.9.2", optional = true }
mimalloc = "0.1.37"
once_cell = "1.18.0"
qstring = "0.7.2"
# Alternate Allocator
mimalloc = { version = "0.1.41", optional = true }
# Transcoding Images to WebP/AVIF to save bandwidth
image = { version = "0.25.1", features = ["jpeg", "webp", "rayon"], default-features = false, optional = true }
libwebp-sys = { version = "0.12.0", optional = true }
ravif = { version = "0.11.5", optional = true }
rgb = { version = "0.8.37", optional = true }
once_cell = "1.19.0"
regex = "1.10.4"
blake3 = { version = "1.5.5", optional = true }
bytes = "1.9.0"
futures-util = "0.3.30"
listenfd = "1.0.1"
http = "1.2.0"
ravif = { version = "0.11.2", optional = true }
rgb = { version = "0.8.36", optional = true }
regex = "1.9.3"
reqwest = { version = "0.11.18", features = ["rustls-tls", "stream", "brotli", "gzip"], default-features = false }
tokio = { version = "1.30.0", features = ["full"] }
[features]
default = ["webp", "mimalloc", "reqwest-rustls", "qhash"]
reqwest-rustls = ["reqwest/rustls-tls"]
reqwest-native-tls = ["reqwest/default-tls"]
avif = ["dep:ravif", "dep:rgb", "dep:image"]
webp = ["dep:libwebp-sys", "dep:image"]
mimalloc = ["dep:mimalloc"]
optimized = ["libwebp-sys?/sse41", "libwebp-sys?/avx2", "libwebp-sys?/neon"]
qhash = ["blake3"]
default = ["webp"]
avif = ["dep:ravif", "dep:rgb"]
webp = ["dep:libwebp-sys"]
[profile.release]
lto = true
lto = true

View file

@ -1,97 +1,34 @@
mod ump_stream;
mod utils;
use std::env;
use std::error::Error;
use actix_web::http::StatusCode;
use actix_web::{web, App, HttpRequest, HttpResponse, HttpResponseBuilder, HttpServer};
use listenfd::ListenFd;
use actix_web::{App, HttpRequest, HttpResponse, HttpResponseBuilder, HttpServer, web};
use actix_web::http::Method;
use mimalloc::MiMalloc;
use once_cell::sync::Lazy;
use qstring::QString;
use regex::Regex;
use reqwest::{Body, Client, Request, Url};
use std::error::Error;
use std::io::ErrorKind;
use std::net::TcpListener;
use std::os::unix::net::UnixListener;
use std::str::FromStr;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{env, io};
#[cfg(not(any(feature = "reqwest-native-tls", feature = "reqwest-rustls")))]
compile_error!("feature \"reqwest-native-tls\" or \"reqwest-rustls\" must be set for proxy to have TLS support");
use futures_util::TryStreamExt;
use http::{HeaderName, Method};
use reqwest::header::HeaderValue;
#[cfg(any(feature = "webp", feature = "avif", feature = "qhash"))]
use tokio::task::spawn_blocking;
use ump_stream::UmpTransformStream;
#[cfg(feature = "mimalloc")]
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
fn try_get_fd_listeners() -> (Option<UnixListener>, Option<TcpListener>) {
let mut fd = ListenFd::from_env();
let unix_listener = env::var("FD_UNIX").ok().map(|fd_unix| {
let fd_pos = fd_unix.parse().expect("FD_UNIX is not a number");
println!("Trying to take Unix socket at position {}", fd_pos);
fd.take_unix_listener(fd_pos)
.expect(format!("fd {} is not a Unix socket", fd_pos).as_str())
.expect(format!("fd {} has already been used", fd_pos).as_str())
});
let tcp_listener = env::var("FD_TCP").ok().map(|fd_tcp| {
let fd_pos = fd_tcp.parse().expect("FD_TCP is not a number");
println!("Trying to take TCP listener at position {}", fd_pos);
fd.take_tcp_listener(fd_pos)
.expect(format!("fd {} is not a TCP listener", fd_pos).as_str())
.expect(format!("fd {} has already been used", fd_pos).as_str())
});
(unix_listener, tcp_listener)
}
static GLOBAL: MiMalloc = MiMalloc;
#[actix_web::main]
async fn main() -> std::io::Result<()> {
println!("Running server!");
let mut server = HttpServer::new(|| {
let server = HttpServer::new(|| {
// match all requests
App::new().default_service(web::to(index))
});
let fd_listeners = try_get_fd_listeners();
if let Some(unix_listener) = fd_listeners.0 {
server = server
.listen_uds(unix_listener)
.expect("Error while trying to listen on Unix socket passed by fd");
println!("Listening on Unix socket passed by fd.");
// get port from env
if env::var("UDS").is_ok() {
server.bind_uds("./socket/actix.sock")?
} else {
let bind = env::var("BIND").unwrap_or_else(|_| "0.0.0.0:8080".to_string());
server.bind(bind)?
}
if let Some(tcp_listener) = fd_listeners.1 {
server = server
.listen(tcp_listener)
.expect("Error while trying to listen on TCP listener passed by fd");
println!("Listening on TCP listener passed by fd.");
}
// Only bind manually if there is not already a listener
if server.addrs().is_empty() {
// get socket/port from env
// backwards compat when only UDS is set
server = if utils::get_env_bool("UDS") {
let socket_path =
env::var("BIND_UNIX").unwrap_or_else(|_| "./socket/actix.sock".to_string());
server.bind_uds(socket_path)?
} else {
let bind = env::var("BIND").unwrap_or_else(|_| "0.0.0.0:8080".to_string());
server.bind(bind)?
};
}
server.run().await
.run()
.await
}
static RE_DOMAIN: Lazy<Regex> =
@ -104,31 +41,14 @@ static CLIENT: Lazy<Client> = Lazy::new(|| {
let builder = Client::builder()
.user_agent("Mozilla/5.0 (Windows NT 10.0; rv:102.0) Gecko/20100101 Firefox/102.0");
let proxy = if let Ok(proxy) = env::var("PROXY") {
reqwest::Proxy::all(proxy).ok()
} else {
None
};
let builder = if let Some(proxy) = proxy {
// proxy basic auth
if let Ok(proxy_auth_user) = env::var("PROXY_USER") {
let proxy_auth_pass = env::var("PROXY_PASS").unwrap_or_default();
builder.proxy(proxy.basic_auth(&proxy_auth_user, &proxy_auth_pass))
} else {
builder.proxy(proxy)
}
} else {
if env::var("IPV4_ONLY").is_ok() {
builder
};
if utils::get_env_bool("IPV4_ONLY") {
builder.local_address("0.0.0.0".parse().ok())
.local_address(Some("0.0.0.0".parse().unwrap()))
.build()
.unwrap()
} else {
builder
builder.build().unwrap()
}
.build()
.unwrap()
});
const ANDROID_USER_AGENT: &str = "com.google.android.youtube/1537338816 (Linux; U; Android 13; en_US; ; Build/TQ2A.230505.002; Cronet/113.0.5672.24)";
@ -166,180 +86,56 @@ fn is_header_allowed(header: &str) -> bool {
| "report-to"
| "strict-transport-security"
| "user-agent"
| "range"
| "transfer-encoding"
| "x-real-ip"
| "origin"
| "referer"
// the 'x-title' header contains non-ascii characters which is not allowed on some HTTP clients
| "x-title"
)
}
async fn index(req: HttpRequest) -> Result<HttpResponse, Box<dyn Error>> {
if req.method() == actix_web::http::Method::OPTIONS {
if req.method() == Method::OPTIONS {
let mut response = HttpResponse::Ok();
add_headers(&mut response);
return Ok(response.finish());
} else if req.method() != actix_web::http::Method::GET
&& req.method() != actix_web::http::Method::HEAD
{
} else if req.method() != Method::GET && req.method() != Method::HEAD {
let mut response = HttpResponse::MethodNotAllowed();
add_headers(&mut response);
return Ok(response.finish());
}
// parse query string
let mut query = QString::from(req.query_string());
let query = QString::from(req.query_string());
#[cfg(feature = "qhash")]
{
use std::collections::BTreeSet;
let res = query.get("host");
let res = res.map(|s| s.to_string());
let secret = env::var("HASH_SECRET");
if let Ok(secret) = secret {
let Some(qhash) = query.get("qhash") else {
return Err("No qhash provided".into());
};
if qhash.len() != 8 {
return Err("Invalid qhash provided".into());
}
let path = req.path().as_bytes().to_owned();
// Store sorted key-value pairs
let mut set = BTreeSet::new();
{
let pairs = query.to_pairs();
for (key, value) in &pairs {
if matches!(*key, "qhash" | "range" | "rewrite") {
continue;
}
set.insert((key.as_bytes().to_owned(), value.as_bytes().to_owned()));
}
}
let hash = spawn_blocking(move || {
let mut hasher = blake3::Hasher::new();
for (key, value) in set {
hasher.update(&key);
hasher.update(&value);
}
let range_marker = b"/range/";
// Find the slice before "/range/"
if let Some(position) = path
.windows(range_marker.len())
.position(|window| window == range_marker)
{
// Update the hasher with the part of the path before "/range/"
// We add +1 to include the "/" in the hash
// This is done for DASH streams for the manifests provided by YouTube
hasher.update(&path[..(position + 1)]);
} else {
hasher.update(&path);
}
hasher.update(secret.as_bytes());
let hash = hasher.finalize().to_hex();
hash[..8].to_owned()
})
.await
.unwrap();
if hash != qhash {
return Err("Invalid qhash provided".into());
}
}
}
let Some(host) = query.get("host").map(|s| s.to_string()) else {
if res.is_none() {
return Err("No host provided".into());
};
#[cfg(any(feature = "webp", feature = "avif"))]
let disallow_image_transcoding = utils::get_env_bool("DISALLOW_IMAGE_TRANSCODING");
}
let rewrite = query.get("rewrite") != Some("false");
#[cfg(feature = "avif")]
let avif = query.get("avif") == Some("true");
let Some(domain) = RE_DOMAIN
.captures(host.as_str())
.map(|domain| domain.get(1).unwrap().as_str())
else {
let host = res.unwrap();
let domain = RE_DOMAIN.captures(host.as_str());
if domain.is_none() {
return Err("Invalid host provided".into());
};
}
let domain = domain.unwrap().get(1).unwrap().as_str();
if !ALLOWED_DOMAINS.contains(&domain) {
return Err("Domain not allowed".into());
}
let video_playback = req.path().eq("/videoplayback");
if video_playback {
if let Some(expiry) = query.get("expire") {
let expiry = expiry.parse::<i64>()?;
let now = SystemTime::now();
let now = now.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs() as i64;
if now > expiry {
return Err("Expire time in past".into());
}
}
}
let is_android = video_playback && query.get("c").unwrap_or("").eq("ANDROID");
let is_web = video_playback && query.get("c").unwrap_or("").eq("WEB");
let is_ump = video_playback && query.get("ump").is_some();
let mime_type = query.get("mime").map(|s| s.to_string());
let clen = query
.get("clen")
.map(|s| s.to_string().parse::<u64>().unwrap());
if video_playback && !query.has("range") {
if let Some(range) = req.headers().get("range") {
let range = range.to_str().unwrap();
let range = range.replace("bytes=", "");
let range = range.split('-').collect::<Vec<_>>();
let start = range[0].parse::<u64>().unwrap();
let end = match range[1].parse::<u64>() {
Ok(end) => end,
Err(_) => {
if let Some(clen) = clen {
clen - 1
} else {
0
}
}
};
if end != 0 {
let range = format!("{}-{}", start, end);
query.add_pair(("range", range));
}
} else if let Some(clen) = clen {
let range = format!("0-{}", clen - 1);
query.add_pair(("range", range));
}
}
let range = query.get("range").map(|s| s.to_string());
let qs = {
let collected = query
.into_pairs()
.into_iter()
.filter(|(key, _)| !matches!(key.as_str(), "host" | "rewrite" | "qhash"))
.filter(|(key, _)| key != "host" && key != "rewrite")
.collect::<Vec<_>>();
QString::new(collected)
};
@ -348,38 +144,40 @@ async fn index(req: HttpRequest) -> Result<HttpResponse, Box<dyn Error>> {
url.set_query(Some(qs.to_string().as_str()));
let method = {
if is_web && video_playback {
if !is_android && video_playback {
Method::POST
} else {
Method::from_str(req.method().as_str())?
req.method().clone()
}
};
let mut request = Request::new(method, url);
if is_web && video_playback {
if !is_android && video_playback {
request.body_mut().replace(Body::from("x\0"));
}
let request_headers = request.headers_mut();
for (key, value) in req.headers() {
let key = key.as_str();
if is_header_allowed(key) {
request_headers.insert(
HeaderName::from_str(key)?,
HeaderValue::from_bytes(value.as_bytes())?,
);
if is_header_allowed(key.as_str()) {
request_headers.insert(key, value.clone());
}
}
if is_android {
request_headers.insert("User-Agent", ANDROID_USER_AGENT.parse()?);
request_headers.insert("User-Agent", ANDROID_USER_AGENT.parse().unwrap());
}
let resp = CLIENT.execute(request).await?;
let resp = CLIENT.execute(request).await;
let mut response = HttpResponse::build(StatusCode::from_u16(resp.status().as_u16())?);
if resp.is_err() {
return Err(resp.err().unwrap().into());
}
let resp = resp?;
let mut response = HttpResponse::build(resp.status());
add_headers(&mut response);
@ -392,81 +190,73 @@ async fn index(req: HttpRequest) -> Result<HttpResponse, Box<dyn Error>> {
if rewrite {
if let Some(content_type) = resp.headers().get("content-type") {
#[cfg(feature = "avif")]
if !disallow_image_transcoding
&& (content_type == "image/webp" || content_type == "image/jpeg" && avif)
{
if content_type == "image/webp" || content_type == "image/jpeg" && avif {
use ravif::{Encoder, Img};
use rgb::FromSlice;
let resp_bytes = resp.bytes().await.unwrap();
let (body, content_type) = spawn_blocking(|| {
use ravif::{Encoder, Img};
use rgb::FromSlice;
let image = image::load_from_memory(&resp_bytes).unwrap();
let image = image::load_from_memory(&resp_bytes).unwrap();
let width = image.width() as usize;
let height = image.height() as usize;
let width = image.width() as usize;
let height = image.height() as usize;
let buf = image.into_rgb8();
let buf = buf.as_raw().as_rgb();
let buf = image.into_rgb8();
let buf = buf.as_raw().as_rgb();
let buffer = Img::new(buf, width, height);
let buffer = Img::new(buf, width, height);
let res = Encoder::new()
.with_quality(80f32)
.with_speed(7)
.encode_rgb(buffer);
let res = Encoder::new()
.with_quality(80f32)
.with_speed(7)
.encode_rgb(buffer);
if let Ok(res) = res {
(res.avif_file.to_vec(), "image/avif")
} else {
(resp_bytes.into(), "image/jpeg")
}
})
.await
.unwrap();
response.content_type(content_type);
return Ok(response.body(body));
return if let Ok(res) = res {
response.content_type("image/avif");
Ok(response.body(res.avif_file.to_vec()))
} else {
response.content_type("image/jpeg");
Ok(response.body(resp_bytes))
};
}
#[cfg(feature = "webp")]
if !disallow_image_transcoding && content_type == "image/jpeg" {
if content_type == "image/jpeg" {
use libwebp_sys::{WebPEncodeRGB, WebPFree};
let resp_bytes = resp.bytes().await.unwrap();
let (body, content_type) = spawn_blocking(|| {
use libwebp_sys::{WebPEncodeRGB, WebPFree};
let image = image::load_from_memory(&resp_bytes).unwrap();
let width = image.width();
let height = image.height();
let image = image::load_from_memory(&resp_bytes).unwrap();
let width = image.width();
let height = image.height();
let quality = 85;
let quality = 85;
let data = image.as_rgb8().unwrap().as_raw();
let data = image.as_rgb8().unwrap().as_raw();
let bytes: Vec<u8> = unsafe {
let mut out_buf = std::ptr::null_mut();
let stride = width as i32 * 3;
let len: usize = WebPEncodeRGB(
data.as_ptr(),
width as i32,
height as i32,
stride,
quality as f32,
&mut out_buf,
);
let vec = std::slice::from_raw_parts(out_buf, len).into();
WebPFree(out_buf as *mut _);
vec
};
let bytes: Vec<u8> = unsafe {
let mut out_buf = std::ptr::null_mut();
let stride = width as i32 * 3;
let len: usize = WebPEncodeRGB(
data.as_ptr(),
width as i32,
height as i32,
stride,
quality as f32,
&mut out_buf,
);
let vec = std::slice::from_raw_parts(out_buf, len).into();
WebPFree(out_buf as *mut _);
vec
};
if bytes.len() < resp_bytes.len() {
(bytes, "image/webp")
} else {
(resp_bytes.into(), "image/jpeg")
}
})
.await
.unwrap();
response.content_type(content_type);
return Ok(response.body(body));
if bytes.len() < resp_bytes.len() {
response.content_type("image/webp");
return Ok(response.body(bytes));
}
response.content_type("image/jpeg");
return Ok(response.body(resp_bytes));
}
if content_type == "application/x-mpegurl"
@ -481,13 +271,11 @@ async fn index(req: HttpRequest) -> Result<HttpResponse, Box<dyn Error>> {
if let Some(captures) = captures {
let url = captures.get(1).unwrap().as_str();
if url.starts_with("https://") {
return line.replace(
url,
utils::localize_url(url, host.as_str()).as_str(),
);
return line
.replace(url, localize_url(url, host.as_str()).as_str());
}
}
utils::localize_url(line, host.as_str())
localize_url(line, host.as_str())
})
.collect::<Vec<String>>()
.join("\n");
@ -495,64 +283,43 @@ async fn index(req: HttpRequest) -> Result<HttpResponse, Box<dyn Error>> {
return Ok(response.body(modified));
}
if content_type == "video/vnd.mpeg.dash.mpd" || content_type == "application/dash+xml" {
let resp_str = resp.text().await.unwrap();
let mut new_resp = resp_str.clone();
let captures = RE_DASH_MANIFEST.captures_iter(&resp_str);
let mut resp_str = resp.text().await.unwrap();
let clone_resp = resp_str.clone();
let captures = RE_DASH_MANIFEST.captures_iter(&clone_resp);
for capture in captures {
let url = capture.get(1).unwrap().as_str();
let new_url = utils::localize_url(url, host.as_str());
let new_url = utils::escape_xml(new_url.as_str());
new_resp = new_resp.replace(url, new_url.as_ref());
let new_url = localize_url(url, host.as_str());
resp_str = resp_str.replace(url, new_url.as_str());
}
return Ok(response.body(new_resp));
return Ok(response.body(resp_str));
}
}
}
if let Some(content_length) = resp.headers().get("content-length") {
response.no_chunking(content_length.to_str().unwrap().parse::<u64>().unwrap());
}
if is_ump && resp.status().is_success() {
if let Some(mime_type) = mime_type {
response.content_type(mime_type);
}
if req.headers().contains_key("range") {
// check if it's not the whole stream
if let Some(ref range) = range {
if let Some(clen) = clen {
if range != &format!("0-{}", clen - 1) {
response.status(StatusCode::PARTIAL_CONTENT);
}
}
}
}
let resp = resp.bytes_stream();
let resp = resp.map_err(|e| io::Error::new(ErrorKind::Other, e));
let transformed_stream = UmpTransformStream::new(resp);
// print errors
let transformed_stream = transformed_stream.map_err(|e| {
eprintln!("UMP Transforming Error: {}", e);
e
});
// calculate content length from clen and range
if let Some(clen) = clen {
let length = if let Some(ref range) = range {
let range = range.replace("bytes=", "");
let range = range.split('-').collect::<Vec<_>>();
let start = range[0].parse::<u64>().unwrap();
let end = range[1].parse::<u64>().unwrap_or(clen - 1);
end - start + 1
} else {
clen
};
response.no_chunking(length);
}
return Ok(response.streaming(transformed_stream));
response.append_header(("content-length", content_length));
}
// Stream response
Ok(response.streaming(resp.bytes_stream()))
}
fn localize_url(url: &str, host: &str) -> String {
if url.starts_with("https://") {
let mut url = Url::parse(url).unwrap();
let host = url.host().unwrap().to_string();
// set host query param
url.query_pairs_mut().append_pair("host", &host);
return format!("{}?{}", url.path(), url.query().unwrap());
} else if url.ends_with(".m3u8") || url.ends_with(".ts") {
return if url.contains('?') {
format!("{}&host={}", url, host)
} else {
format!("{}?host={}", url, host)
};
}
url.to_string()
}

View file

@ -1,154 +0,0 @@
use crate::utils;
use bytes::{Bytes, BytesMut};
use futures_util::Stream;
use std::io;
use std::io::ErrorKind;
use std::pin::Pin;
use std::task::{Context, Poll};
fn read_variable_integer(buf: &[u8], offset: usize) -> io::Result<(i32, usize)> {
let mut pos = offset;
let prefix = utils::read_buf(buf, &mut pos);
let mut size = 0;
for shift in 1..=5 {
if prefix & (128 >> (shift - 1)) == 0 {
size = shift;
break;
}
}
if !(1..=5).contains(&size) {
return Err(io::Error::new(
ErrorKind::InvalidData,
format!("Invalid integer size {} at position {}", size, offset),
));
}
match size {
1 => Ok((prefix as i32, size)),
2 => {
let value = ((utils::read_buf(buf, &mut pos) as i32) << 6) | (prefix as i32 & 0b111111);
Ok((value, size))
}
3 => {
let value = (((utils::read_buf(buf, &mut pos) as i32)
| ((utils::read_buf(buf, &mut pos) as i32) << 8))
<< 5)
| (prefix as i32 & 0b11111);
Ok((value, size))
}
4 => {
let value = (((utils::read_buf(buf, &mut pos) as i32)
| ((utils::read_buf(buf, &mut pos) as i32) << 8)
| ((utils::read_buf(buf, &mut pos) as i32) << 16))
<< 4)
| (prefix as i32 & 0b1111);
Ok((value, size))
}
_ => {
let value = (utils::read_buf(buf, &mut pos) as i32)
| ((utils::read_buf(buf, &mut pos) as i32) << 8)
| ((utils::read_buf(buf, &mut pos) as i32) << 16)
| ((utils::read_buf(buf, &mut pos) as i32) << 24);
Ok((value, size))
}
}
}
pub struct UmpTransformStream<S>
where
S: Stream<Item = Result<Bytes, io::Error>> + Unpin,
{
inner: S,
buffer: BytesMut,
found_stream: bool,
remaining: usize,
}
impl<S> UmpTransformStream<S>
where
S: Stream<Item = Result<Bytes, io::Error>> + Unpin,
{
pub fn new(stream: S) -> Self {
UmpTransformStream {
inner: stream,
buffer: BytesMut::new(),
found_stream: false,
remaining: 0,
}
}
}
impl<S> Stream for UmpTransformStream<S>
where
S: Stream<Item = Result<Bytes, io::Error>> + Unpin,
{
type Item = Result<Bytes, io::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
while let Poll::Ready(item) = Pin::new(&mut this.inner).poll_next(cx) {
match item {
Some(Ok(bytes)) => {
if this.found_stream {
if this.remaining > 0 {
let len = std::cmp::min(this.remaining, bytes.len());
this.remaining -= len;
if this.remaining == 0 {
this.buffer.clear();
this.buffer.extend_from_slice(&bytes[len..]);
this.found_stream = false;
}
return Poll::Ready(Some(Ok(bytes.slice(0..len))));
} else {
this.found_stream = false;
this.buffer.clear();
this.buffer.extend_from_slice(&bytes);
};
} else {
this.buffer.extend_from_slice(&bytes);
}
}
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
None => {
return Poll::Ready(None);
}
}
}
if !this.found_stream && !this.buffer.is_empty() {
let (segment_type, s1) = match read_variable_integer(&this.buffer, 0) {
Ok(result) => result,
Err(_) => return Poll::Pending,
};
let (segment_length, s2) = match read_variable_integer(&this.buffer, s1) {
Ok(result) => result,
Err(_) => return Poll::Pending,
};
if segment_type != 21 {
// Not the stream
if this.buffer.len() > s1 + s2 + segment_length as usize {
let _ = this.buffer.split_to(s1 + s2 + segment_length as usize);
}
} else {
this.remaining = segment_length as usize - 1;
let _ = this.buffer.split_to(s1 + s2 + 1);
if this.buffer.len() > segment_length as usize {
let len = std::cmp::min(this.remaining, this.buffer.len());
this.remaining -= len;
return Poll::Ready(Some(Ok(this.buffer.split_to(len).into())));
} else {
this.remaining -= this.buffer.len();
this.found_stream = true;
return Poll::Ready(Some(Ok(this.buffer.to_vec().into())));
}
}
}
Poll::Pending
}
}

View file

@ -1,103 +0,0 @@
use qstring::QString;
use reqwest::Url;
use std::borrow::Cow;
use std::collections::BTreeMap;
use std::env;
pub fn read_buf(buf: &[u8], pos: &mut usize) -> u8 {
let byte = buf[*pos];
*pos += 1;
byte
}
fn finalize_url(path: &str, query: BTreeMap<String, String>) -> String {
#[cfg(feature = "qhash")]
{
use std::collections::BTreeSet;
let qhash = {
let secret = env::var("HASH_SECRET");
if let Ok(secret) = secret {
let set = query
.iter()
.filter(|(key, _)| !matches!(key.as_str(), "qhash" | "range" | "rewrite"))
.map(|(key, value)| (key.as_bytes().to_owned(), value.as_bytes().to_owned()))
.collect::<BTreeSet<_>>();
let mut hasher = blake3::Hasher::new();
for (key, value) in set {
hasher.update(&key);
hasher.update(&value);
}
hasher.update(path.as_bytes());
hasher.update(secret.as_bytes());
let hash = hasher.finalize().to_hex();
Some(hash[..8].to_owned())
} else {
None
}
};
if let Some(qhash) = qhash {
let mut query = QString::new(query.into_iter().collect::<Vec<_>>());
query.add_pair(("qhash", qhash));
return format!("{}?{}", path, query);
}
}
let query = QString::new(query.into_iter().collect::<Vec<_>>());
format!("{}?{}", path, query)
}
pub fn localize_url(url: &str, host: &str) -> String {
if url.starts_with("https://") {
let url = Url::parse(url).unwrap();
let host = url.host().unwrap().to_string();
let mut query = url.query_pairs().into_owned().collect::<BTreeMap<_, _>>();
query.insert("host".to_string(), host.clone());
return finalize_url(url.path(), query);
} else if url.ends_with(".m3u8") || url.ends_with(".ts") {
let mut query = BTreeMap::new();
query.insert("host".to_string(), host.to_string());
return finalize_url(url, query);
}
url.to_string()
}
pub fn escape_xml(raw: &str) -> Cow<'_, str> {
if !raw.contains(&['<', '>', '&', '\'', '"'][..]) {
// If there are no characters to escape, return the original string.
Cow::Borrowed(raw)
} else {
// If there are characters to escape, build a new string with the replacements.
let mut escaped = String::with_capacity(raw.len());
for c in raw.chars() {
match c {
'<' => escaped.push_str("&lt;"),
'>' => escaped.push_str("&gt;"),
'&' => escaped.push_str("&amp;"),
'\'' => escaped.push_str("&apos;"),
'"' => escaped.push_str("&quot;"),
_ => escaped.push(c),
}
}
Cow::Owned(escaped)
}
}
pub fn get_env_bool(key: &str) -> bool {
match env::var(key) {
Ok(val) => val.to_lowercase() == "true" || val == "1",
Err(_) => false,
}
}

12
sweep.yaml Normal file
View file

@ -0,0 +1,12 @@
# Sweep AI turns bug fixes & feature requests into code changes (https://sweep.dev)
# For details on our config file, check out our docs at https://docs.sweep.dev
# If you use this be sure to frequently sync your default branch(main, master) to dev.
branch: 'main'
# By default Sweep will read the logs and outputs from your existing Github Actions. To disable this, set this to false.
gha_enabled: True
# This is the description of your project. It will be used by sweep when creating PRs. You can tell Sweep what's unique about your project, what frameworks you use, or anything else you want.
# Here's an example: sweepai/sweep is a python project. The main api endpoints are in sweepai/api.py. Write code that adheres to PEP8.
description: ''
# Default Values: https://github.com/sweepai/sweep/blob/main/sweep.yaml