Merge branch 'sig_helper'
Some checks are pending
Invidious CI / build (push) Waiting to run

This commit is contained in:
Fijxu 2024-08-10 16:17:09 -04:00
commit b7430c5a5a
Signed by: Fijxu
GPG key ID: 32C1DDF333EDA6A4
9 changed files with 466 additions and 122 deletions

View file

@ -1,6 +1,6 @@
#########################################
#
# Database configuration
# Database and other external servers
#
#########################################
@ -41,6 +41,19 @@ db:
#check_tables: false
##
## Path to an external signature resolver, used to emulate
## the Youtube client's Javascript. If no such server is
## available, some videos will not be playable.
##
## When this setting is commented out, no external
## resolver will be used.
##
## Accepted values: a path to a UNIX socket or "<IP>:<Port>"
## Default: <none>
##
#signature_server:
#########################################
#
@ -343,21 +356,6 @@ full_refresh: false
##
feed_threads: 1
##
## Enable/Disable the polling job that keeps the decryption
## function (for "secured" videos) up to date.
##
## Note: This part of the code generate a small amount of data every minute.
## This may not be desired if you have bandwidth limits set by your ISP.
##
## Note 2: This part of the code is currently broken, so changing
## this setting has no impact.
##
## Accepted values: true, false
## Default: false
##
#decrypt_polling: false
jobs:

View file

@ -151,6 +151,15 @@ Invidious::Database.check_integrity(CONFIG)
{% puts "\nDone checking player dependencies, now compiling Invidious...\n" %}
{% end %}
# Misc
DECRYPT_FUNCTION =
if sig_helper_address = CONFIG.signature_server.presence
IV::DecryptFunction.new(sig_helper_address)
else
nil
end
# Start jobs
if CONFIG.channel_threads > 0

View file

@ -136,6 +136,10 @@ class Config
# Connect to YouTube over 'ipv6', 'ipv4'. Will sometimes resolve fix issues with rate-limiting (see https://github.com/ytdl-org/youtube-dl/issues/21729)
@[YAML::Field(converter: Preferences::FamilyConverter)]
property force_resolve : Socket::Family = Socket::Family::UNSPEC
# External signature solver server socket (either a path to a UNIX domain socket or "<IP>:<Port>")
property signature_server : String? = nil
# Port to listen for connections (overridden by command line argument)
property port : Int32 = 3000
# Host to bind (overridden by command line argument)

View file

@ -3,9 +3,9 @@
# IPv6 addresses.
#
class TCPSocket
def initialize(host : String, port, dns_timeout = nil, connect_timeout = nil, family = Socket::Family::UNSPEC)
def initialize(host, port, dns_timeout = nil, connect_timeout = nil, blocking = false, family = Socket::Family::UNSPEC)
Addrinfo.tcp(host, port, timeout: dns_timeout, family: family) do |addrinfo|
super(addrinfo.family, addrinfo.type, addrinfo.protocol)
super(addrinfo.family, addrinfo.type, addrinfo.protocol, blocking)
connect(addrinfo, timeout: connect_timeout) do |error|
close
error
@ -26,7 +26,7 @@ class HTTP::Client
end
hostname = @host.starts_with?('[') && @host.ends_with?(']') ? @host[1..-2] : @host
io = TCPSocket.new hostname, @port, @dns_timeout, @connect_timeout, @family
io = TCPSocket.new hostname, @port, @dns_timeout, @connect_timeout, family: @family
io.read_timeout = @read_timeout if @read_timeout
io.write_timeout = @write_timeout if @write_timeout
io.sync = false
@ -35,7 +35,7 @@ class HTTP::Client
if tls = @tls
tcp_socket = io
begin
io = OpenSSL::SSL::Socket::Client.new(tcp_socket, context: tls, sync_close: true, hostname: @host)
io = OpenSSL::SSL::Socket::Client.new(tcp_socket, context: tls, sync_close: true, hostname: @host.rchop('.'))
rescue exc
# don't leak the TCP socket when the SSL connection failed
tcp_socket.close

View file

@ -0,0 +1,332 @@
require "uri"
require "socket"
require "socket/tcp_socket"
require "socket/unix_socket"
{% if flag?(:advanced_debug) %}
require "io/hexdump"
{% end %}
private alias NetworkEndian = IO::ByteFormat::NetworkEndian
module Invidious::SigHelper
enum UpdateStatus
Updated
UpdateNotRequired
Error
end
# -------------------
# Payload types
# -------------------
abstract struct Payload
end
struct StringPayload < Payload
getter string : String
def initialize(str : String)
raise Exception.new("SigHelper: String can't be empty") if str.empty?
@string = str
end
def self.from_bytes(slice : Bytes)
size = IO::ByteFormat::NetworkEndian.decode(UInt16, slice)
if size == 0 # Error code
raise Exception.new("SigHelper: Server encountered an error")
end
if (slice.bytesize - 2) != size
raise Exception.new("SigHelper: String size mismatch")
end
if str = String.new(slice[2..])
return self.new(str)
else
raise Exception.new("SigHelper: Can't read string from socket")
end
end
def to_io(io)
# `.to_u16` raises if there is an overflow during the conversion
io.write_bytes(@string.bytesize.to_u16, NetworkEndian)
io.write(@string.to_slice)
end
end
private enum Opcode
FORCE_UPDATE = 0
DECRYPT_N_SIGNATURE = 1
DECRYPT_SIGNATURE = 2
GET_SIGNATURE_TIMESTAMP = 3
GET_PLAYER_STATUS = 4
PLAYER_UPDATE_TIMESTAMP = 5
end
private record Request,
opcode : Opcode,
payload : Payload?
# ----------------------
# High-level functions
# ----------------------
class Client
@mux : Multiplexor
def initialize(uri_or_path)
@mux = Multiplexor.new(uri_or_path)
end
# Forces the server to re-fetch the YouTube player, and extract the necessary
# components from it (nsig function code, sig function code, signature timestamp).
def force_update : UpdateStatus
request = Request.new(Opcode::FORCE_UPDATE, nil)
value = send_request(request) do |bytes|
IO::ByteFormat::NetworkEndian.decode(UInt16, bytes)
end
case value
when 0x0000 then return UpdateStatus::Error
when 0xFFFF then return UpdateStatus::UpdateNotRequired
when 0xF44F then return UpdateStatus::Updated
else
code = value.nil? ? "nil" : value.to_s(base: 16)
raise Exception.new("SigHelper: Invalid status code received #{code}")
end
end
# Decrypt a provided n signature using the server's current nsig function
# code, and return the result (or an error).
def decrypt_n_param(n : String) : String?
request = Request.new(Opcode::DECRYPT_N_SIGNATURE, StringPayload.new(n))
n_dec = self.send_request(request) do |bytes|
StringPayload.from_bytes(bytes).string
end
return n_dec
end
# Decrypt a provided s signature using the server's current sig function
# code, and return the result (or an error).
def decrypt_sig(sig : String) : String?
request = Request.new(Opcode::DECRYPT_SIGNATURE, StringPayload.new(sig))
sig_dec = self.send_request(request) do |bytes|
StringPayload.from_bytes(bytes).string
end
return sig_dec
end
# Return the signature timestamp from the server's current player
def get_signature_timestamp : UInt64?
request = Request.new(Opcode::GET_SIGNATURE_TIMESTAMP, nil)
return self.send_request(request) do |bytes|
IO::ByteFormat::NetworkEndian.decode(UInt64, bytes)
end
end
# Return the current player's version
def get_player : UInt32?
request = Request.new(Opcode::GET_PLAYER_STATUS, nil)
return self.send_request(request) do |bytes|
has_player = (bytes[0] == 0xFF)
player_version = IO::ByteFormat::NetworkEndian.decode(UInt32, bytes[1..4])
has_player ? player_version : nil
end
end
# Return when the player was last updated
def get_player_timestamp : UInt64?
request = Request.new(Opcode::PLAYER_UPDATE_TIMESTAMP, nil)
return self.send_request(request) do |bytes|
IO::ByteFormat::NetworkEndian.decode(UInt64, bytes)
end
end
private def send_request(request : Request, &)
channel = @mux.send(request)
slice = channel.receive
return yield slice
rescue ex
LOGGER.debug("SigHelper: Error when sending a request")
LOGGER.trace(ex.inspect_with_backtrace)
return nil
end
end
# ---------------------
# Low level functions
# ---------------------
class Multiplexor
alias TransactionID = UInt32
record Transaction, channel = ::Channel(Bytes).new
@prng = Random.new
@mutex = Mutex.new
@queue = {} of TransactionID => Transaction
@conn : Connection
def initialize(uri_or_path)
@conn = Connection.new(uri_or_path)
listen
end
def listen : Nil
raise "Socket is closed" if @conn.closed?
LOGGER.debug("SigHelper: Multiplexor listening")
# TODO: reopen socket if unexpectedly closed
spawn do
loop do
receive_data
Fiber.yield
end
end
end
def send(request : Request)
transaction = Transaction.new
transaction_id = @prng.rand(TransactionID)
# Add transaction to queue
@mutex.synchronize do
# On a 32-bits random integer, this should never happen. Though, just in case, ...
if @queue[transaction_id]?
raise Exception.new("SigHelper: Duplicate transaction ID! You got a shiny pokemon!")
end
@queue[transaction_id] = transaction
end
write_packet(transaction_id, request)
return transaction.channel
end
def receive_data
transaction_id, slice = read_packet
@mutex.synchronize do
if transaction = @queue.delete(transaction_id)
# Remove transaction from queue and send data to the channel
transaction.channel.send(slice)
LOGGER.trace("SigHelper: Transaction unqueued and data sent to channel")
else
raise Exception.new("SigHelper: Received transaction was not in queue")
end
end
end
# Read a single packet from the socket
private def read_packet : {TransactionID, Bytes}
# Header
transaction_id = @conn.read_bytes(UInt32, NetworkEndian)
length = @conn.read_bytes(UInt32, NetworkEndian)
LOGGER.trace("SigHelper: Recv transaction 0x#{transaction_id.to_s(base: 16)} / length #{length}")
if length > 67_000
raise Exception.new("SigHelper: Packet longer than expected (#{length})")
end
# Payload
slice = Bytes.new(length)
@conn.read(slice) if length > 0
LOGGER.trace("SigHelper: payload = #{slice}")
LOGGER.trace("SigHelper: Recv transaction 0x#{transaction_id.to_s(base: 16)} - Done")
return transaction_id, slice
end
# Write a single packet to the socket
private def write_packet(transaction_id : TransactionID, request : Request)
LOGGER.trace("SigHelper: Send transaction 0x#{transaction_id.to_s(base: 16)} / opcode #{request.opcode}")
io = IO::Memory.new(1024)
io.write_bytes(request.opcode.to_u8, NetworkEndian)
io.write_bytes(transaction_id, NetworkEndian)
if payload = request.payload
payload.to_io(io)
end
@conn.send(io)
@conn.flush
LOGGER.trace("SigHelper: Send transaction 0x#{transaction_id.to_s(base: 16)} - Done")
end
end
class Connection
@socket : UNIXSocket | TCPSocket
{% if flag?(:advanced_debug) %}
@io : IO::Hexdump
{% end %}
def initialize(host_or_path : String)
case host_or_path
when .starts_with?('/')
# Make sure that the file exists
if File.exists?(host_or_path)
@socket = UNIXSocket.new(host_or_path)
else
raise Exception.new("SigHelper: '#{host_or_path}' no such file")
end
when .starts_with?("tcp://")
uri = URI.parse(host_or_path)
@socket = TCPSocket.new(uri.host.not_nil!, uri.port.not_nil!)
else
uri = URI.parse("tcp://#{host_or_path}")
@socket = TCPSocket.new(uri.host.not_nil!, uri.port.not_nil!)
end
LOGGER.info("SigHelper: Using helper at '#{host_or_path}'")
{% if flag?(:advanced_debug) %}
@io = IO::Hexdump.new(@socket, output: STDERR, read: true, write: true)
{% end %}
@socket.sync = false
@socket.blocking = false
end
def closed? : Bool
return @socket.closed?
end
def close : Nil
@socket.close if !@socket.closed?
end
def flush(*args, **options)
@socket.flush(*args, **options)
end
def send(*args, **options)
@socket.send(*args, **options)
end
# Wrap IO functions, with added debug tooling if needed
{% for function in %w(read read_bytes write write_bytes) %}
def {{function.id}}(*args, **options)
{% if flag?(:advanced_debug) %}
@io.{{function.id}}(*args, **options)
{% else %}
@socket.{{function.id}}(*args, **options)
{% end %}
end
{% end %}
end
end

View file

@ -1,73 +1,55 @@
alias SigProc = Proc(Array(String), Int32, Array(String))
require "http/params"
require "./sig_helper"
struct DecryptFunction
@decrypt_function = [] of {SigProc, Int32}
@decrypt_time = Time.monotonic
class Invidious::DecryptFunction
@last_update : Time = Time.utc - 42.days
def initialize(@use_polling = true)
def initialize(uri_or_path)
@client = SigHelper::Client.new(uri_or_path)
self.check_update
end
def update_decrypt_function
@decrypt_function = fetch_decrypt_function
def check_update
now = Time.utc
# If we have updated in the last 5 minutes, do nothing
return if (now - @last_update) > 5.minutes
# Get the amount of time elapsed since when the player was updated, in the
# event where multiple invidious processes are run in parallel.
update_time_elapsed = (@client.get_player_timestamp || 301).seconds
if update_time_elapsed > 5.minutes
LOGGER.debug("Signature: Player might be outdated, updating")
@client.force_update
@last_update = Time.utc
end
end
private def fetch_decrypt_function(id = "CvFH_6DNRCY")
document = YT_POOL.client &.get("/watch?v=#{id}&gl=US&hl=en").body
url = document.match(/src="(?<url>\/s\/player\/[^\/]+\/player_ias[^\/]+\/en_US\/base.js)"/).not_nil!["url"]
player = YT_POOL.client &.get(url).body
function_name = player.match(/^(?<name>[^=]+)=function\(\w\){\w=\w\.split\(""\);[^\. ]+\.[^( ]+/m).not_nil!["name"]
function_body = player.match(/^#{Regex.escape(function_name)}=function\(\w\){(?<body>[^}]+)}/m).not_nil!["body"]
function_body = function_body.split(";")[1..-2]
var_name = function_body[0][0, 2]
var_body = player.delete("\n").match(/var #{Regex.escape(var_name)}={(?<body>(.*?))};/).not_nil!["body"]
operations = {} of String => SigProc
var_body.split("},").each do |operation|
op_name = operation.match(/^[^:]+/).not_nil![0]
op_body = operation.match(/\{[^}]+/).not_nil![0]
case op_body
when "{a.reverse()"
operations[op_name] = ->(a : Array(String), _b : Int32) { a.reverse }
when "{a.splice(0,b)"
operations[op_name] = ->(a : Array(String), b : Int32) { a.delete_at(0..(b - 1)); a }
else
operations[op_name] = ->(a : Array(String), b : Int32) { c = a[0]; a[0] = a[b % a.size]; a[b % a.size] = c; a }
end
end
decrypt_function = [] of {SigProc, Int32}
function_body.each do |function|
function = function.lchop(var_name).delete("[].")
op_name = function.match(/[^\(]+/).not_nil![0]
value = function.match(/\(\w,(?<value>[\d]+)\)/).not_nil!["value"].to_i
decrypt_function << {operations[op_name], value}
end
return decrypt_function
def decrypt_nsig(n : String) : String?
self.check_update
return @client.decrypt_n_param(n)
rescue ex
LOGGER.debug(ex.message || "Signature: Unknown error")
LOGGER.trace(ex.inspect_with_backtrace)
return nil
end
def decrypt_signature(fmt : Hash(String, JSON::Any))
return "" if !fmt["s"]? || !fmt["sp"]?
def decrypt_signature(str : String) : String?
self.check_update
return @client.decrypt_sig(str)
rescue ex
LOGGER.debug(ex.message || "Signature: Unknown error")
LOGGER.trace(ex.inspect_with_backtrace)
return nil
end
sp = fmt["sp"].as_s
sig = fmt["s"].as_s.split("")
if !@use_polling
now = Time.monotonic
if now - @decrypt_time > 60.seconds || @decrypt_function.size == 0
@decrypt_function = fetch_decrypt_function
@decrypt_time = Time.monotonic
end
end
@decrypt_function.each do |proc, value|
sig = proc.call(sig, value)
end
return "&#{sp}=#{sig.join("")}"
def get_sts : UInt64?
self.check_update
return @client.get_signature_timestamp
rescue ex
LOGGER.debug(ex.message || "Signature: Unknown error")
LOGGER.trace(ex.inspect_with_backtrace)
return nil
end
end

View file

@ -1,14 +0,0 @@
class Invidious::Jobs::UpdateDecryptFunctionJob < Invidious::Jobs::BaseJob
def begin
loop do
begin
DECRYPT_FUNCTION.update_decrypt_function
rescue ex
LOGGER.error("UpdateDecryptFunctionJob : #{ex.message}")
ensure
sleep 1.minute
Fiber.yield
end
end
end
end

View file

@ -98,20 +98,47 @@ struct Video
# Methods for parsing streaming data
def convert_url(fmt)
if cfr = fmt["signatureCipher"]?.try { |json| HTTP::Params.parse(json.as_s) }
sp = cfr["sp"]
url = URI.parse(cfr["url"])
params = url.query_params
LOGGER.debug("Videos: Decoding '#{cfr}'")
unsig = DECRYPT_FUNCTION.try &.decrypt_signature(cfr["s"])
params[sp] = unsig if unsig
else
url = URI.parse(fmt["url"].as_s)
params = url.query_params
end
n = DECRYPT_FUNCTION.try &.decrypt_nsig(params["n"])
params["n"] = n if n
params["host"] = url.host.not_nil!
if region = self.info["region"]?.try &.as_s
params["region"] = region
end
url.query_params = params
LOGGER.trace("Videos: new url is '#{url}'")
return url.to_s
rescue ex
LOGGER.debug("Videos: Error when parsing video URL")
LOGGER.trace(ex.inspect_with_backtrace)
return ""
end
def fmt_stream
return @fmt_stream.as(Array(Hash(String, JSON::Any))) if @fmt_stream
fmt_stream = info["streamingData"]?.try &.["formats"]?.try &.as_a.map &.as_h || [] of Hash(String, JSON::Any)
fmt_stream.each do |fmt|
if s = (fmt["cipher"]? || fmt["signatureCipher"]?).try { |h| HTTP::Params.parse(h.as_s) }
s.each do |k, v|
fmt[k] = JSON::Any.new(v)
end
fmt["url"] = JSON::Any.new("#{fmt["url"]}#{DECRYPT_FUNCTION.decrypt_signature(fmt)}")
end
fmt_stream = info.dig?("streamingData", "formats")
.try &.as_a.map &.as_h || [] of Hash(String, JSON::Any)
fmt["url"] = JSON::Any.new("#{fmt["url"]}&host=#{URI.parse(fmt["url"].as_s).host}")
fmt["url"] = JSON::Any.new("#{fmt["url"]}&region=#{self.info["region"]}") if self.info["region"]?
fmt_stream.each do |fmt|
fmt["url"] = JSON::Any.new(self.convert_url(fmt))
end
fmt_stream.sort_by! { |f| f["width"]?.try &.as_i || 0 }
@ -121,21 +148,17 @@ struct Video
def adaptive_fmts
return @adaptive_fmts.as(Array(Hash(String, JSON::Any))) if @adaptive_fmts
fmt_stream = info["streamingData"]?.try &.["adaptiveFormats"]?.try &.as_a.map &.as_h || [] of Hash(String, JSON::Any)
fmt_stream.each do |fmt|
if s = (fmt["cipher"]? || fmt["signatureCipher"]?).try { |h| HTTP::Params.parse(h.as_s) }
s.each do |k, v|
fmt[k] = JSON::Any.new(v)
end
fmt["url"] = JSON::Any.new("#{fmt["url"]}#{DECRYPT_FUNCTION.decrypt_signature(fmt)}")
end
fmt["url"] = JSON::Any.new("#{fmt["url"]}&host=#{URI.parse(fmt["url"].as_s).host}")
fmt["url"] = JSON::Any.new("#{fmt["url"]}&region=#{self.info["region"]}") if self.info["region"]?
fmt_stream = info.dig("streamingData", "adaptiveFormats")
.try &.as_a.map &.as_h || [] of Hash(String, JSON::Any)
fmt_stream.each do |fmt|
fmt["url"] = JSON::Any.new(self.convert_url(fmt))
end
fmt_stream.sort_by! { |f| f["width"]?.try &.as_i || 0 }
@adaptive_fmts = fmt_stream
return @adaptive_fmts.as(Array(Hash(String, JSON::Any)))
end

View file

@ -272,7 +272,7 @@ module YoutubeAPI
# Return, as a Hash, the "context" data required to request the
# youtube API endpoints.
#
private def make_context(client_config : ClientConfig | Nil) : Hash
private def make_context(client_config : ClientConfig | Nil, video_id = "dQw4w9WgXcQ") : Hash
# Use the default client config if nil is passed
client_config ||= DEFAULT_CLIENT_CONFIG
@ -298,7 +298,7 @@ module YoutubeAPI
if client_config.screen == "EMBED"
client_context["thirdParty"] = {
"embedUrl" => "https://www.youtube.com/embed/dQw4w9WgXcQ",
"embedUrl" => "https://www.youtube.com/embed/#{video_id}",
} of String => String | Int64
end
@ -459,19 +459,29 @@ module YoutubeAPI
params : String,
client_config : ClientConfig | Nil = nil
)
# Playback context, separate because it can be different between clients
playback_ctx = {
"html5Preference" => "HTML5_PREF_WANTS",
"referer" => "https://www.youtube.com/watch?v=#{video_id}",
} of String => String | Int64
if {"WEB", "TVHTML5"}.any? { |s| client_config.name.starts_with? s }
if sts = DECRYPT_FUNCTION.try &.get_sts
playback_ctx["signatureTimestamp"] = sts.to_i64
end
end
# JSON Request data, required by the API
data = {
"contentCheckOk" => true,
"videoId" => video_id,
"context" => self.make_context(client_config),
"context" => self.make_context(client_config, video_id),
"racyCheckOk" => true,
"user" => {
"lockedSafetyMode" => false,
},
"playbackContext" => {
"contentPlaybackContext" => {
"html5Preference": "HTML5_PREF_WANTS",
},
"contentPlaybackContext" => playback_ctx,
},
"serviceIntegrityDimensions" => {
"poToken": REDIS_DB.get("POTOKEN"),