# # # patch "database.cc" # from [d3317b33caa87d188423402901d99e6b607fbba0] # to [eb7af6af4e79245613d6b5c5a890c25adcb8ecb0] # # patch "database.hh" # from [eb1090478c14d1eafe33a0ef5b82cf2673ffbdc5] # to [1339421823e22fc72ce673055bf3200a235e1b2f] # # patch "netsync.cc" # from [7c29d867999a70fc04cc646b4cfa94f7f980ad60] # to [3983fb90d601e87ef41f34256f8547980344aee6] # ============================================================ --- database.cc d3317b33caa87d188423402901d99e6b607fbba0 +++ database.cc eb7af6af4e79245613d6b5c5a890c25adcb8ecb0 @@ -2966,7 +2966,14 @@ // transaction guards -transaction_guard::transaction_guard(database & d, bool exclusive) : committed(false), db(d) +transaction_guard::transaction_guard(database & d, bool exclusive, + size_t checkpoint_batch_size, + size_t checkpoint_batch_bytes) + : committed(false), db(d), exclusive(exclusive), + checkpoint_batch_size(checkpoint_batch_size), + checkpoint_batch_bytes(checkpoint_batch_bytes), + checkpointed_calls(0), + checkpointed_bytes(0) { db.begin_transaction(exclusive); } @@ -2980,6 +2987,25 @@ } void +transaction_guard::do_checkpoint() +{ + db.commit_transaction(); + db.begin_transaction(exclusive); + checkpointed_calls = 0; + checkpointed_bytes = 0; +} + +void +transaction_guard::maybe_checkpoint(size_t nbytes) +{ + checkpointed_calls += 1; + checkpointed_bytes += nbytes; + if (checkpointed_calls >= checkpoint_batch_size + || checkpointed_bytes >= checkpoint_batch_bytes) + do_checkpoint(); +} + +void transaction_guard::commit() { committed = true; ============================================================ --- database.hh eb1090478c14d1eafe33a0ef5b82cf2673ffbdc5 +++ database.hh 1339421823e22fc72ce673055bf3200a235e1b2f @@ -454,29 +454,71 @@ }; -// transaction guards nest. acquire one in any scope you'd like -// transaction-protected, and it'll make sure the db aborts a -// txn if there's any exception before you call commit() - -// by default, locks the database exclusively. -// if the transaction is intended to be read-only, call with exclusive=False -// in this case, if a database update is attempted and another process is accessing -// the database an exception will be thrown - uglier and more confusing for the user - -// however no data inconsistency should result. +// Transaction guards nest. Acquire one in any scope you'd like +// transaction-protected, and it'll make sure the db aborts a transaction +// if there's any exception before you call commit(). +// +// By default, transaction_guard locks the database exclusively. If the +// transaction is intended to be read-only, construct the guard with +// exclusive=false. In this case, if a database update is attempted and +// another process is accessing the database an exception will be thrown - +// uglier and more confusing for the user - however no data inconsistency +// should result. // -// an exception is thrown if an exclusive transaction_guard is created while -// a non-exclusive transaction_guard exists. +// An exception is thrown if an exclusive transaction_guard is created +// while a non-exclusive transaction_guard exists. +// +// Transaction guards also support splitting long transactions up into +// checkpoints. Any time you feel the database is in an +// acceptably-consistent state, you can call maybe_checkpoint(nn) with a +// given number of bytes. When the number of bytes and number of +// maybe_checkpoint() calls exceeds the guard's parameters, the transaction +// is committed and reopened. Any time you feel the database has reached a +// point where want to ensure a transaction commit, without destructing the +// object, you can call do_checkpoint(). +// +// This does *not* free you from having to call .commit() on the guard when +// it "completes" its lifecycle. Here's a way to think of checkpointing: a +// normal transaction guard is associated with a program-control +// scope. Sometimes (notably in netsync) it is not convenient to create a +// scope which exactly matches the size of work-unit you want to commit (a +// bunch of packets, or a session-close, whichever comes first) so +// checkpointing allows you to use a long-lived transaction guard and mark +// off the moments where commits are desired, without destructing the +// guard. The guard still performs an error-management task in case of an +// exception, so you still have to clean it before destruction using +// .commit(). +// +// Checkpointing also does not override the transaction guard nesting: if +// there's an enclosing transaction_guard, your checkpointing calls have no +// affect. +// +// The purpose of checkpointing is to provide an alternative to "many short +// transactions" on platforms (OSX in particular) where the overhead of +// full commits at high frequency is too high. The solution for these +// platforms is to run inside a longer-lived transaction (session-length), +// and checkpoint at higher granularity (every megabyte or so). class transaction_guard { bool committed; database & db; + bool exclusive; + size_t const checkpoint_batch_size; + size_t const checkpoint_batch_bytes; + size_t checkpointed_calls; + size_t checkpointed_bytes; public: - transaction_guard(database & d, bool exclusive=true); + transaction_guard(database & d, bool exclusive=true, + size_t checkpoint_batch_size=100, + size_t checkpoint_batch_bytes=0xfffff); ~transaction_guard(); + void do_checkpoint(); + void maybe_checkpoint(size_t nbytes); void commit(); }; + void close_all_databases(); ============================================================ --- netsync.cc 7c29d867999a70fc04cc646b4cfa94f7f980ad60 +++ netsync.cc 3983fb90d601e87ef41f34256f8547980344aee6 @@ -374,7 +374,7 @@ bool received_all_items(); bool finished_working(); void maybe_step(); - void maybe_say_goodbye(); + void maybe_say_goodbye(transaction_guard & guard); void note_item_arrived(netcmd_item_type ty, id const & i); void maybe_note_epochs_finished(); @@ -427,7 +427,7 @@ bool process_hello_cmd(rsa_keypair_id const & server_keyname, rsa_pub_key const & server_key, id const & nonce); - bool process_bye_cmd(u8 phase); + bool process_bye_cmd(u8 phase, transaction_guard & guard); bool process_anonymous_cmd(protocol_role role, utf8 const & their_include_pattern, utf8 const & their_exclude_pattern); @@ -455,7 +455,8 @@ bool process_usher_cmd(utf8 const & msg); // The incoming dispatcher. - bool dispatch_payload(netcmd const & cmd); + bool dispatch_payload(netcmd const & cmd, + transaction_guard & guard); // Various helpers. void respond_to_confirm_cmd(); @@ -464,7 +465,7 @@ void send_all_data(netcmd_item_type ty, set const & items); void begin_service(); - bool process(); + bool process(transaction_guard & guard); }; @@ -1588,7 +1589,8 @@ } bool -session::process_bye_cmd(u8 phase) +session::process_bye_cmd(u8 phase, + transaction_guard & guard) { // Ideal shutdown @@ -1600,9 +1602,11 @@ // server: S_WORKING // 0. [refinement, data, deltas, etc.] // client: C_SHUTDOWN +// (client checkpoints here) // 1. client -> "bye 0" // 2. "bye 0" -> server // server: S_SHUTDOWN +// (server checkpoints here) // 3. "bye 1" <- server // 4. client <- "bye 1" // client: C_CONFIRMED @@ -1629,6 +1633,7 @@ protocol_state == working_state) { protocol_state = shutdown_state; + guard.do_checkpoint(); queue_bye_cmd(1); } else @@ -2067,7 +2072,8 @@ } bool -session::dispatch_payload(netcmd const & cmd) +session::dispatch_payload(netcmd const & cmd, + transaction_guard & guard) { switch (cmd.get_cmd_code()) @@ -2098,7 +2104,7 @@ { u8 phase; cmd.read_bye_cmd(phase); - return process_bye_cmd(phase); + return process_bye_cmd(phase, guard); } break; @@ -2276,13 +2282,14 @@ } void -session::maybe_say_goodbye() +session::maybe_say_goodbye(transaction_guard & guard) { if (voice == client_voice && protocol_state == working_state && finished_working()) { protocol_state = shutdown_state; + guard.do_checkpoint(); queue_bye_cmd(0); } } @@ -2304,7 +2311,7 @@ return armed; } -bool session::process() +bool session::process(transaction_guard & guard) { if (encountered_error) return true; @@ -2313,19 +2320,18 @@ if (!arm()) return true; - transaction_guard guard(app.db); - armed = false; L(F("processing %d byte input buffer from peer %s\n") % inbuf.size() % peer_id); - bool ret = dispatch_payload(cmd); + size_t sz = cmd.encoded_size(); + bool ret = dispatch_payload(cmd, guard); if (inbuf.size() >= constants::netcmd_maxsz) W(F("input buffer for peer %s is overfull " "after netcmd dispatch\n") % peer_id); - guard.commit(); + guard.maybe_checkpoint(sz); if (!ret) L(F("finishing processing with '%d' packet") @@ -2357,6 +2363,9 @@ Netxx::port_type default_port, unsigned long timeout_seconds) { + + transaction_guard guard(app.db); + Netxx::Probe probe; Netxx::Timeout timeout(static_cast(timeout_seconds)), instant(0,1); #ifdef USE_IPV6 @@ -2387,7 +2396,7 @@ } sess.maybe_step(); - sess.maybe_say_goodbye(); + sess.maybe_say_goodbye(guard); probe.clear(); probe.add(sess.str, sess.which_events()); @@ -2418,8 +2427,11 @@ } if (armed) - if (!sess.process()) + if (!sess.process(guard)) { + // Commit whatever work we managed to accomplish anyways. + guard.commit(); + // We failed during processing. This should only happen in // client voice when we have a decode exception, or received an // error from our server (which is translated to a decode @@ -2432,6 +2444,9 @@ if (!all_io_clean) { + // Commit whatever work we managed to accomplish anyways. + guard.commit(); + // We had an I/O error. We must decide if this represents a // user-reported error or a clean disconnect. See protocol // state diagram in session::process_bye_cmd. @@ -2467,7 +2482,6 @@ i != sessions.end(); ++i) { i->second->maybe_step(); - i->second->maybe_say_goodbye(); try { if (i->second->arm()) @@ -2605,7 +2619,8 @@ static void process_armed_sessions(map > & sessions, - set & armed_sessions) + set & armed_sessions, + transaction_guard & guard) { for (set::const_iterator i = armed_sessions.begin(); i != armed_sessions.end(); ++i) @@ -2617,7 +2632,7 @@ else { shared_ptr sess = j->second; - if (!sess->process()) + if (!sess->process(guard)) { P(F("peer %s processing finished, disconnecting\n") % sess->peer_id); @@ -2694,6 +2709,8 @@ map > sessions; set armed_sessions; + shared_ptr guard; + while (true) { probe.clear(); @@ -2713,6 +2730,11 @@ : instant)); Netxx::Probe::ready_type event = res.second; Netxx::socket_type fd = res.first; + + if (!guard) + guard = shared_ptr(new transaction_guard(app.db)); + + I(guard); if (fd == -1) { @@ -2756,8 +2778,15 @@ } } } - process_armed_sessions(sessions, armed_sessions); + process_armed_sessions(sessions, armed_sessions, *guard); reap_dead_sessions(sessions, timeout_seconds); + + if (sessions.empty()) + { + // Let the guard die completely if everything's gone quiet. + guard->commit(); + guard.reset(); + } } } @@ -2965,21 +2994,9 @@ else { I(voice == client_voice); - - // FIXME: this transaction guard is disabled, to make the - // client's pull "resumable" if the client hits ctrl-C. We are - // not sure this is a good optimization; some platforms (OSX?) - // are apparantly very grumpy if they do a lot of - // commits. Profile and check, possibly re-enable conditionally - // or with some form of commit-batching. - - //transaction_guard guard(app.db); - call_server(role, include_pattern, exclude_pattern, app, addr, static_cast(constants::netsync_default_port), static_cast(constants::netsync_timeout_seconds)); - - //guard.commit(); } } catch (Netxx::NetworkException & e)