mldonkey-users
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Mldonkey-users] [PATCH] BT/DHT: show stats, choose random port


From: ygrek
Subject: [Mldonkey-users] [PATCH] BT/DHT: show stats, choose random port
Date: Sun, 20 Mar 2011 13:31:50 +0200

---
 src/networks/bittorrent/bTGlobals.ml |   27 +++++++++++-
 src/networks/bittorrent/bTOptions.ml |    4 +-
 src/networks/bittorrent/bT_DHT.ml    |   80 ++++++++++++++++++++++++++-------
 src/networks/bittorrent/kademlia.ml  |    1 +
 4 files changed, 92 insertions(+), 20 deletions(-)

diff --git src/networks/bittorrent/bTGlobals.ml 
src/networks/bittorrent/bTGlobals.ml
index f6c9b99..8d8c5e7 100644
--- src/networks/bittorrent/bTGlobals.ml
+++ src/networks/bittorrent/bTGlobals.ml
@@ -925,7 +925,7 @@ Define a function to be called when the "mem_stats" command
 
 **************************************************************)
 
-let _ =
+let () =
   Heap.add_memstat "BittorrentGlobals" (fun level buf ->
      Printf.bprintf buf "Number of old files: %d\n" (List.length !!old_files);
      let downloads = ref 0 in
@@ -940,3 +940,28 @@ let _ =
      Printf.bprintf buf "files_by_uid: %d\n" (Hashtbl.length files_by_uid);
      Printf.bprintf buf "ft_by_num: %d\n" (Hashtbl.length ft_by_num);
   )
+
+open BT_DHT
+
+let () =
+  Heap.add_memstat "BittorrentDHT" (fun _level buf ->
+    match !bt_dht with
+    | None -> ()
+    | Some dht ->
+    let (buckets,nodes,keys,peers) = stat dht in
+    Printf.bprintf buf "Routing : %d nodes in %d buckets\n" nodes buckets;
+    Printf.bprintf buf "Storage : %d keys with %d peers\n" keys peers;
+    List.iter (fun s -> Printf.bprintf buf "%s\n" s) (rpc_stats dht);
+    let queries = 
["PING",`Ping;"FIND_NODE",`FindNode;"GET_PEERS",`GetPeers;"ANNOUNCE",`Announce] 
in
+    Printf.bprintf buf "Outgoing queries : ok/error/timeout\n";
+    List.iter begin fun (name,qt) ->
+      let get k = try Hashtbl.find dht.M.stats (qt,`Out k) with Not_found -> 0 
in
+      Printf.bprintf buf "%s: %d/%d/%d\n" name (get `Answer) (get `Error) (get 
`Timeout);
+      end queries;
+    Printf.bprintf buf "Incoming queries\n";
+    List.iter begin fun (name,qt) ->
+      let get () = try Hashtbl.find dht.M.stats (qt,`In) with Not_found -> 0 in
+      Printf.bprintf buf "%s: %d\n" name (get ())
+      end queries
+  )
+
diff --git src/networks/bittorrent/bTOptions.ml 
src/networks/bittorrent/bTOptions.ml
index 1664fec..64064c7 100644
--- src/networks/bittorrent/bTOptions.ml
+++ src/networks/bittorrent/bTOptions.ml
@@ -165,8 +165,8 @@ let get_user_agent () =
   else !!user_agent
 
 let dht_port = define_option bittorrent_section ["dht_port"]
-  "The UDP port to bind the DHT node to (0 to disable)"
-  port_option 12345
+  "DHT port (UDP, set 0 to disable)"
+  port_option (2000 + Random.int 20000)
 
 let use_trackers = define_option bittorrent_section ["use_trackers"]
   "Send announces to trackers"
diff --git src/networks/bittorrent/bT_DHT.ml src/networks/bittorrent/bT_DHT.ml
index 8d6f039..b554e4c 100644
--- src/networks/bittorrent/bT_DHT.ml
+++ src/networks/bittorrent/bT_DHT.ml
@@ -56,6 +56,8 @@ let clear h = Hashtbl.clear h
 
 end
 
+let stats_add h k n = Hashtbl.replace h k (n + try Hashtbl.find h k with 
Not_found -> 0)
+
 module KRPC = struct
 
 type dict = (string * Bencode.value) list
@@ -120,12 +122,26 @@ let udp_set_reader socket f =
 
 module A = Assoc2
 
-let send sock (ip,port as addr) txnmsg =
+let send sock stats (ip,port as addr) txnmsg =
   let s = encode txnmsg in
   if !debug then lprintf_nl "KRPC to %s : %S" (show_addr addr) s;
+  stats_add stats `Sent 1;
+  stats_add stats `SentBytes (String.length s);
   write sock false s ip port
 
-type t = UdpSocket.t * (addr, string, (addr -> dict -> unit) * (unit -> unit) 
* int) A.t
+type stats_key = [ `Timeout | `Sent | `SentBytes | `Recv | `RecvBytes | 
`Decoded | `Handled | `NoTxn ]
+type t =
+  UdpSocket.t *
+  (stats_key, int) Hashtbl.t * 
+  (addr, string, (addr -> dict -> unit) * ([`Error|`Timeout]-> unit) * int) A.t
+let show_stats t =
+  let get k = try Hashtbl.find t k with Not_found -> 0 in
+  [
+    sprintf "rpc recv %d pkts (%d bytes)" (get `Recv) (get `RecvBytes);
+    sprintf "rpc sent %d pkts (%d bytes)" (get `Sent) (get `SentBytes);
+    sprintf "rpc decoded %d, handled %d" (get `Decoded) (get `Handled);
+    sprintf "rpc timeouted %d, orphan %d" (get `Timeout) (get `NoTxn);
+  ]
 
 let create port enabler bw_control answer : t =
   let socket = create Unix.inet_addr_any port (fun sock event ->
@@ -141,15 +157,17 @@ let create port enabler bw_control answer : t =
   set_wtimeout (sock socket) 5.;
   set_rtimeout (sock socket) 5.;
   let h = A.create () in
+  let stats = Hashtbl.create 10 in
   let timeout h =
     let now = last_time () in
     let bad = ref [] in
     let total = ref 0 in
     A.iter h (fun addr txn (_,kerr,t) -> incr total; if t < now then bad := 
(addr,txn,kerr) :: !bad);
     if !debug then lprintf_nl "timeouted %d of %d DHT queries" (List.length 
!bad) !total;
+    stats_add stats `Timeout (List.length !bad);
     List.iter (fun (addr,txn,kerr) ->
       A.remove h addr txn;
-      try kerr () with exn -> if !debug then lprintf_nl ~exn "timeout for %s" 
(show_addr addr)) !bad;
+      try kerr `Timeout with exn -> if !debug then lprintf_nl ~exn "timeout 
for %s" (show_addr addr)) !bad;
   in
   BasicSocket.add_session_timer enabler 5. (fun () -> timeout h);
   let handle addr (txn,ver,msg) =
@@ -159,15 +177,19 @@ let create port enabler bw_control answer : t =
     | Error (code,msg) ->
         if !verb then lprintf_nl "error received from %s%s : %Ld %S" 
(show_addr addr) !!version code msg;
         begin match A.find h addr txn with
-        | None -> if !verb then lprintf_nl "no txn %S for %s%s" txn (show_addr 
addr) !!version
-        | Some (_, kerr, _) -> A.remove h addr txn; kerr ()
+        | None ->
+          stats_add stats `NoTxn 1;
+          if !verb then lprintf_nl "no txn %S for %s%s" txn (show_addr addr) 
!!version
+        | Some (_, kerr, _) -> A.remove h addr txn; kerr `Error
         end
     | Query (name,args) ->
         let ret = answer addr name args in
-        send socket addr (txn, ret)
+        send socket stats addr (txn, ret)
     | Response ret ->
         match A.find h addr txn with
-        | None -> if !verb then lprintf_nl "no txn %S for %s%s" txn (show_addr 
addr) !!version
+        | None ->
+          stats_add stats `NoTxn 1;
+          if !verb then lprintf_nl "no txn %S for %s%s" txn (show_addr addr) 
!!version
         | Some (k,_,_) -> A.remove h addr txn; k addr ret
   in
   let handle p =
@@ -177,13 +199,17 @@ let create port enabler bw_control answer : t =
       let addr = (Ip.of_inet_addr inet_addr, port) in
       let ret = ref None in
       try
+        stats_add stats `RecvBytes (String.length p.udp_content);
+        stats_add stats `Recv 1;
         let r = decode_exn p.udp_content in
+        stats_add stats `Decoded 1;
         ret := Some r;
-        handle addr r
+        handle addr r;
+        stats_add stats `Handled 1;
       with exn ->
         let version = match !ret with Some (_,Some s,_) -> sprintf " client 
%S" s | _ -> "" in
         if !verb then lprintf_nl ~exn "handle packet from %s%s : %S" 
(show_addr addr) version p.udp_content;
-        let error txn code str = send socket addr (txn,(Error (Int64.of_int 
code,str))) in
+        let error txn code str = send socket stats addr (txn,(Error 
(Int64.of_int code,str))) in
         match exn,!ret with
         | Malformed_packet x, Some (txn, _, _)
         | Protocol_error ("",x), Some(txn, _, _) | Protocol_error (txn,x), _ 
-> error txn 203 x
@@ -192,15 +218,15 @@ let create port enabler bw_control answer : t =
         | _ -> ()
   in
   udp_set_reader socket handle;
-  (socket,h)
+  (socket,stats,h)
 
-let shutdown (socket,h) =
+let shutdown (socket,_,h) =
   close socket Closed_by_user;
   A.iter h (fun addr _ (_,kerr,_) ->
-    try kerr () with exn -> if !verb then lprintf_nl ~exn "shutdown for %s" 
(show_addr addr));
+    try kerr `Timeout with exn -> if !verb then lprintf_nl ~exn "shutdown for 
%s" (show_addr addr));
   A.clear h
 
-let write (socket,h) msg addr k ~kerr =
+let write (socket,stats,h) msg addr k ~kerr =
   let tt = Assoc2.find_all h addr in
   let rec loop () = (* choose txn FIXME *)
     let txn = string_of_int (Random.int 1_000_000) in
@@ -210,7 +236,7 @@ let write (socket,h) msg addr k ~kerr =
   in
   let txn = loop () in
   Assoc2.add h addr txn (k,kerr,last_time () + dht_query_timeout);
-  send socket addr (txn,msg)
+  send socket stats addr (txn,msg)
 
 end (* KRPC *)
 
@@ -350,21 +376,33 @@ module Peers = Map.Make(struct type t = addr let compare 
= compare end)
 
 module M = struct
 
+type query_type = [ `Ping | `FindNode | `GetPeers | `Announce ]
+type answer_type = [ `Answer | `Error | `Timeout ]
+
 type t = {
   rt : Kademlia.table; (* routing table *)
   rpc : KRPC.t; (* KRPC protocol socket *)
   dht_port : int; (* port *)
   torrents : (H.t, int Peers.t) Hashtbl.t; (* torrents announced by other 
peers *)
   enabler : bool ref; (* timers' enabler *)
+  stats : (query_type * [ `In | `Out of answer_type ], int) Hashtbl.t; (* 
statistics *)
 }
 
+let query_type_of_query = function
+| Ping -> `Ping
+| FindNode _ -> `FindNode
+| GetPeers _ -> `GetPeers
+| Announce _ -> `Announce
+
 let dht_query t addr q k ~kerr =
   if !debug then lprintf_nl "DHT query to %s : %s" (show_addr addr) 
(show_query q);
+  let qt = query_type_of_query q in
   KRPC.write t.rpc (make_query t.rt.self q) addr begin fun addr dict ->
-    let (id,r) = try parse_response_exn q dict with exn -> kerr (); raise exn 
in
+    let (id,r) = try parse_response_exn q dict with exn -> stats_add t.stats 
(qt, `Out `Error) 1; kerr (); raise exn in
     if !debug then lprintf_nl "DHT response from %s (%s) : %s" (show_addr 
addr) (show_id id) (show_response r);
+    stats_add t.stats (qt, `Out `Answer) 1;
     k (id,addr) r
-  end ~kerr
+  end ~kerr:(fun reason -> stats_add t.stats (qt, `Out (reason:>answer_type)) 
1; kerr ())
 
 let ping t addr k = dht_query t addr Ping begin fun node r ->
   match r with Ack -> k (Some node)
@@ -407,7 +445,8 @@ let create rt dht_port bw_control answer =
   let rpc = KRPC.create dht_port enabler bw_control answer in
   let torrents = Hashtbl.create 8 in
   manage_timeouts enabler torrents;
-  { rt = rt; rpc = rpc; torrents = torrents; dht_port = dht_port; enabler = 
enabler; }
+  { rt = rt; rpc = rpc; torrents = torrents; dht_port = dht_port; enabler = 
enabler;
+    stats = Hashtbl.create 10; }
 
 let shutdown dht =
   dht.enabler := false;
@@ -579,6 +618,12 @@ let show_torrents torrents =
   torrents
 
 let show dht = show_table dht.M.rt; show_torrents dht.M.torrents
+let stat dht =
+  buckets dht.M.rt,
+  size dht.M.rt,
+  Hashtbl.length dht.M.torrents,
+  Hashtbl.fold (fun _ peers acc -> acc + Peers.fold (fun _ _ acc -> acc + 1) 
peers 0) dht.M.torrents 0
+let rpc_stats dht = let (_,st,_) = dht.M.rpc in KRPC.show_stats st
 
 let bootstrap dht host addr k =
   M.ping dht addr begin function
@@ -642,6 +687,7 @@ let start rt port bw_control =
     let node = (id,addr) in
     if !debug then lprintf_nl "DHT query from %s : %s" (show_node node) 
(show_query q);
     update !!dht Good id addr;
+    stats_add (!!dht).M.stats (M.query_type_of_query q, `In) 1;
     let response =
       match q with
       | Ping -> Ack
diff --git src/networks/bittorrent/kademlia.ml 
src/networks/bittorrent/kademlia.ml
index 5336449..c6d901f 100644
--- src/networks/bittorrent/kademlia.ml
+++ src/networks/bittorrent/kademlia.ml
@@ -352,6 +352,7 @@ let rec fold f acc = function
   | L b -> f acc b
 
 let size t = fold (fun acc b -> acc + Array.length b.nodes) 0 t.root
+let buckets t = fold (fun acc b -> acc + 1) 0 t.root
 
 (*
 module NoNetwork : Network = struct 
-- 
1.7.2.3




reply via email to

[Prev in Thread] Current Thread [Next in Thread]