Adding a rate limiter, and a blank query catcher.
- Fixes #66 - Fixes #67
This commit is contained in:
parent
946b641845
commit
772dd190c7
2
new_torrents_fetcher/Cargo.lock
generated
2
new_torrents_fetcher/Cargo.lock
generated
@ -1,5 +1,3 @@
|
||||
# This file is automatically @generated by Cargo.
|
||||
# It is not intended for manual editing.
|
||||
[[package]]
|
||||
name = "adler32"
|
||||
version = "1.0.3"
|
||||
|
1
server/service/.rustfmt.toml
Normal file
1
server/service/.rustfmt.toml
Normal file
@ -0,0 +1 @@
|
||||
tab_spaces = 2
|
1940
server/service/Cargo.lock
generated
1940
server/service/Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -4,12 +4,14 @@ version = "0.1.0"
|
||||
authors = ["Dessalines <happydooby@gmail.com>"]
|
||||
|
||||
[dependencies]
|
||||
actix-web = "*"
|
||||
actix-web = "1.0"
|
||||
actix-files = "0.1.7"
|
||||
serde = "*"
|
||||
serde_json = "*"
|
||||
serde_derive = "*"
|
||||
time = "*"
|
||||
failure = "*"
|
||||
|
||||
[dependencies.rusqlite]
|
||||
version = "0.15.0"
|
||||
features = ["bundled"]
|
||||
features = ["bundled"]
|
||||
|
@ -1,3 +1,4 @@
|
||||
extern crate actix_files;
|
||||
extern crate actix_web;
|
||||
extern crate serde;
|
||||
extern crate serde_json;
|
||||
@ -5,31 +6,55 @@ extern crate serde_json;
|
||||
extern crate serde_derive;
|
||||
extern crate rusqlite;
|
||||
extern crate time;
|
||||
#[macro_use]
|
||||
extern crate failure;
|
||||
|
||||
use actix_web::{fs, fs::NamedFile, http, server, App, HttpRequest, HttpResponse, Query};
|
||||
use actix_files as fs;
|
||||
use actix_files::NamedFile;
|
||||
use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer};
|
||||
use failure::Error;
|
||||
use std::collections::HashMap;
|
||||
use std::env;
|
||||
use std::ops::Deref;
|
||||
use std::sync::Mutex;
|
||||
use std::time::SystemTime;
|
||||
|
||||
use rusqlite::{Connection};
|
||||
use rusqlite::Connection;
|
||||
|
||||
const RATE_LIMIT: i32 = 10;
|
||||
const RATE_LIMIT_PER_SECOND: i32 = 60;
|
||||
|
||||
pub struct State {
|
||||
rate_limits: Mutex<HashMap<String, RateLimitBucket>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RateLimitBucket {
|
||||
last_checked: SystemTime,
|
||||
allowance: f64,
|
||||
}
|
||||
|
||||
fn main() {
|
||||
println!("Access me at {}", endpoint());
|
||||
server::new(|| {
|
||||
|
||||
let shared_data = web::Data::new(State {
|
||||
rate_limits: Mutex::new(HashMap::new()),
|
||||
});
|
||||
|
||||
HttpServer::new(move || {
|
||||
App::new()
|
||||
.route("/service/search", http::Method::GET, search)
|
||||
.resource("/", |r| r.f(index))
|
||||
.handler(
|
||||
"/static",
|
||||
fs::StaticFiles::new(front_end_dir())
|
||||
.unwrap()
|
||||
)
|
||||
.finish()
|
||||
}).bind(endpoint())
|
||||
.unwrap()
|
||||
.run();
|
||||
.route("/", web::get().to(index))
|
||||
.service(fs::Files::new("/static", front_end_dir()))
|
||||
.register_data(shared_data.clone())
|
||||
.route("/service/search", web::get().to(search))
|
||||
})
|
||||
.bind(endpoint())
|
||||
.unwrap()
|
||||
.run()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn index(_req: &HttpRequest) -> Result<NamedFile, actix_web::error::Error> {
|
||||
fn index() -> Result<NamedFile, actix_web::error::Error> {
|
||||
Ok(NamedFile::open(front_end_dir() + "/index.html")?)
|
||||
}
|
||||
|
||||
@ -44,28 +69,58 @@ fn torrents_db_file() -> String {
|
||||
fn endpoint() -> String {
|
||||
env::var("TORRENTS_CSV_ENDPOINT").unwrap_or("0.0.0.0:8080".to_string())
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct SearchQuery {
|
||||
q: String,
|
||||
page: Option<usize>,
|
||||
size: Option<usize>,
|
||||
type_: Option<String>
|
||||
type_: Option<String>,
|
||||
}
|
||||
|
||||
fn search(query: Query<SearchQuery>) -> HttpResponse {
|
||||
HttpResponse::Ok()
|
||||
.header("Access-Control-Allow-Origin", "*")
|
||||
.content_type("application/json")
|
||||
.body(search_query(query))
|
||||
fn search(
|
||||
req: HttpRequest,
|
||||
data: web::Data<State>,
|
||||
query: web::Query<SearchQuery>,
|
||||
) -> HttpResponse {
|
||||
let ip = req
|
||||
.connection_info()
|
||||
.remote()
|
||||
.unwrap_or("127.0.0.1:12345")
|
||||
.split(":")
|
||||
.next()
|
||||
.unwrap_or("127.0.0.1")
|
||||
.to_string();
|
||||
|
||||
if query.q.is_empty() {
|
||||
return HttpResponse::BadRequest()
|
||||
.header("Access-Control-Allow-Origin", "*")
|
||||
.content_type("application/json")
|
||||
.body(format!("{{\"error\": \"{}\"}}", "Empty query".to_string()));
|
||||
}
|
||||
|
||||
match check_rate_limit_full(data, &ip, RATE_LIMIT, RATE_LIMIT_PER_SECOND) {
|
||||
Ok(_) => HttpResponse::Ok()
|
||||
.header("Access-Control-Allow-Origin", "*")
|
||||
.content_type("application/json")
|
||||
.body(search_query(query)),
|
||||
Err(e) => HttpResponse::BadRequest()
|
||||
.header("Access-Control-Allow-Origin", "*")
|
||||
.content_type("application/json")
|
||||
.body(format!("{{\"error\": \"{}\"}}", e.to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
fn search_query(query: Query<SearchQuery>) -> String {
|
||||
fn search_query(query: web::Query<SearchQuery>) -> String {
|
||||
let page = query.page.unwrap_or(1);
|
||||
let size = query.size.unwrap_or(10);
|
||||
let type_ = query.type_.as_ref().map_or("torrent", String::deref);
|
||||
let offset = size * (page - 1);
|
||||
|
||||
println!("query = {}, type = {}, page = {}, size = {}", query.q, type_, page, size);
|
||||
println!(
|
||||
"query = {}, type = {}, page = {}, size = {}",
|
||||
query.q, type_, page, size
|
||||
);
|
||||
|
||||
if type_ == "file" {
|
||||
let results = torrent_file_search(&query.q, size, offset);
|
||||
@ -93,20 +148,24 @@ fn torrent_search(query: &str, size: usize, offset: usize) -> Vec<Torrent> {
|
||||
let conn = Connection::open(torrents_db_file()).unwrap();
|
||||
let mut stmt = conn.prepare(&stmt_str).unwrap();
|
||||
let torrent_iter = stmt
|
||||
.query_map(&[
|
||||
query.replace(" ", "%"),
|
||||
size.to_string(),
|
||||
offset.to_string(),
|
||||
], |row| Torrent {
|
||||
infohash: row.get(0),
|
||||
name: row.get(1),
|
||||
size_bytes: row.get(2),
|
||||
created_unix: row.get(3),
|
||||
seeders: row.get(4),
|
||||
leechers: row.get(5),
|
||||
completed: row.get(6),
|
||||
scraped_date: row.get(7),
|
||||
}).unwrap();
|
||||
.query_map(
|
||||
&[
|
||||
query.replace(" ", "%"),
|
||||
size.to_string(),
|
||||
offset.to_string(),
|
||||
],
|
||||
|row| Torrent {
|
||||
infohash: row.get(0),
|
||||
name: row.get(1),
|
||||
size_bytes: row.get(2),
|
||||
created_unix: row.get(3),
|
||||
seeders: row.get(4),
|
||||
leechers: row.get(5),
|
||||
completed: row.get(6),
|
||||
scraped_date: row.get(7),
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let mut torrents = Vec::new();
|
||||
for torrent in torrent_iter {
|
||||
@ -133,21 +192,25 @@ fn torrent_file_search(query: &str, size: usize, offset: usize) -> Vec<File> {
|
||||
let conn = Connection::open(torrents_db_file()).unwrap();
|
||||
let mut stmt = conn.prepare(&stmt_str).unwrap();
|
||||
let file_iter = stmt
|
||||
.query_map(&[
|
||||
query.replace(" ", "%"),
|
||||
size.to_string(),
|
||||
offset.to_string(),
|
||||
], |row| File {
|
||||
infohash: row.get(0),
|
||||
index_: row.get(1),
|
||||
path: row.get(2),
|
||||
size_bytes: row.get(3),
|
||||
created_unix: row.get(4),
|
||||
seeders: row.get(5),
|
||||
leechers: row.get(6),
|
||||
completed: row.get(7),
|
||||
scraped_date: row.get(8),
|
||||
}).unwrap();
|
||||
.query_map(
|
||||
&[
|
||||
query.replace(" ", "%"),
|
||||
size.to_string(),
|
||||
offset.to_string(),
|
||||
],
|
||||
|row| File {
|
||||
infohash: row.get(0),
|
||||
index_: row.get(1),
|
||||
path: row.get(2),
|
||||
size_bytes: row.get(3),
|
||||
created_unix: row.get(4),
|
||||
seeders: row.get(5),
|
||||
leechers: row.get(6),
|
||||
completed: row.get(7),
|
||||
scraped_date: row.get(8),
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let mut files = Vec::new();
|
||||
for file in file_iter {
|
||||
@ -156,6 +219,56 @@ fn torrent_file_search(query: &str, size: usize, offset: usize) -> Vec<File> {
|
||||
files
|
||||
}
|
||||
|
||||
fn check_rate_limit_full(
|
||||
state: web::Data<State>,
|
||||
ip: &str,
|
||||
rate: i32,
|
||||
per: i32,
|
||||
) -> Result<(), Error> {
|
||||
let mut rate_limits = state.rate_limits.lock().unwrap();
|
||||
|
||||
if rate_limits.get_mut(ip).is_none() {
|
||||
rate_limits.insert(
|
||||
ip.to_string(),
|
||||
RateLimitBucket {
|
||||
last_checked: SystemTime::now(),
|
||||
allowance: -2f64,
|
||||
},
|
||||
);
|
||||
}
|
||||
if let Some(rate_limit) = rate_limits.get_mut(ip) {
|
||||
// The initial value
|
||||
if rate_limit.allowance == -2f64 {
|
||||
rate_limit.allowance = rate as f64;
|
||||
};
|
||||
|
||||
let current = SystemTime::now();
|
||||
let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64;
|
||||
rate_limit.last_checked = current;
|
||||
rate_limit.allowance += time_passed * (rate as f64 / per as f64);
|
||||
if rate_limit.allowance > rate as f64 {
|
||||
rate_limit.allowance = rate as f64;
|
||||
}
|
||||
|
||||
if rate_limit.allowance < 1.0 {
|
||||
println!(
|
||||
"Rate limited IP: {}, time_passed: {}, allowance: {}",
|
||||
&ip, time_passed, rate_limit.allowance
|
||||
);
|
||||
Err(format_err!(
|
||||
"Too many requests for IP: {}. {} per {} seconds",
|
||||
&ip,
|
||||
rate,
|
||||
per
|
||||
))?
|
||||
} else {
|
||||
rate_limit.allowance -= 1.0;
|
||||
Ok(())
|
||||
}
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
@ -164,8 +277,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test() {
|
||||
let start = PreciseTime::now();
|
||||
let results =
|
||||
super::torrent_search("sherlock", 10, 0);
|
||||
let results = super::torrent_search("sherlock", 10, 0);
|
||||
assert!(results.len() > 2);
|
||||
let end = PreciseTime::now();
|
||||
println!("Query took {} seconds.", start.to(end));
|
||||
|
Loading…
x
Reference in New Issue
Block a user