# # # patch "netsync.cc" # from [8f8efc71ddee66d028c6109eb91818566d926a1a] # to [24bd0f42d6023df76088fba26e622f2be1aaf3bf] # # patch "packet.cc" # from [6c8855a5184c4abb4485e315e3e28e848f5bb564] # to [50c3d69fe2429f1e334a8d2dc675602d9e4d5400] # # patch "packet.hh" # from [41eff9ec50dfb14b240cfbf962d95c84bf5474da] # to [35b466b5467cb8b2cb27d768b6217dc21a10977f] # ============================================================ --- netsync.cc 8f8efc71ddee66d028c6109eb91818566d926a1a +++ netsync.cc 24bd0f42d6023df76088fba26e622f2be1aaf3bf @@ -67,10 +67,7 @@ // its merkle trie. // -- add some sort of vhost field to the client's first packet, saying who // they expect to talk to -// -- connection teardown is flawed: -// -- simple bug: often connections "fail" even though they succeeded. -// should figure out why. (Possibly one side doesn't wait for their -// goodbye packet to drain before closing the socket?) +// -- connection teardown is still a bit unpleasant: // -- subtle misdesign: "goodbye" packets indicate completion of data // transfer. they do not indicate that data has been written to // disk. there should be some way to indicate that data has been @@ -81,57 +78,49 @@ // switch to using that. // -// this is the "new" network synchronization (netsync) system in -// monotone. it is based on synchronizing a pair of merkle trees over an +// This is the "new" network synchronization (netsync) system in +// monotone. It is based on synchronizing pairs of merkle trees over an // interactive connection. // -// a netsync process between peers treats each peer as either a source, a -// sink, or both. when a peer is only a source, it will not write any new +// A netsync process between peers treats each peer as either a source, a +// sink, or both. When a peer is only a source, it will not write any new // items to its database. when a peer is only a sink, it will not send any -// items from its database. when a peer is both a source and sink, it may +// items from its database. When a peer is both a source and sink, it may // send and write items freely. // -// the post-state of a netsync is that each sink contains a superset of the +// The post-state of a netsync is that each sink contains a superset of the // items in its corresponding source; when peers are behaving as both // source and sink, this means that the post-state of the sync is for the // peers to have identical item sets. // -// a peer can be a sink in at most one netsync process at a time; it can -// however be a source for multiple netsyncs simultaneously. // -// -// data structure +// Data structure // -------------- // -// each node in a merkle tree contains a fixed number of slots. this number +// Each node in a merkle tree contains a fixed number of slots. this number // is derived from a global parameter of the protocol -- the tree fanout -- -// such that the number of slots is 2^fanout. for now we will assume that +// such that the number of slots is 2^fanout. For now we will assume that // fanout is 4 thus there are 16 slots in a node, because this makes -// illustration easier. the other parameter of the protocol is the size of +// illustration easier. The other parameter of the protocol is the size of // a hash; we use SHA1 so the hash is 20 bytes (160 bits) long. // -// each slot in a merkle tree node is in one of 4 states: +// Each slot in a merkle tree node is in one of 3 states: // // - empty -// - live leaf -// - dead leaf +// - leaf // - subtree // -// in addition, each live or dead leaf contains a hash code which -// identifies an element of the set being synchronized. each subtree slot -// contains a hash code of the node immediately beneath it in the merkle -// tree. empty slots contain no hash codes. +// In addition, each leaf contains a hash code which identifies an element +// of the set being synchronized. Each subtree slot contains a hash code of +// the node immediately beneath it in the merkle tree. Empty slots contain +// no hash codes. // -// each node also summarizes, for sake of statistic-gathering, the number -// of set elements and total number of bytes in all of its subtrees, each -// stored as a size_t and sent as a uleb128. -// -// since empty slots have no hash code, they are represented implicitly by -// a bitmap at the head of each merkle tree node. as an additional +// Since empty slots have no hash code, they are represented implicitly by +// a bitmap at the head of each merkle tree node. As an additional // integrity check, each merkle tree node contains a label indicating its // prefix in the tree, and a hash of its own contents. // -// in total, then, the byte-level representation of a <160,4> merkle tree +// In total, then, the byte-level representation of a <160,4> merkle tree // node is as follows: // // 20 bytes - hash of the remaining bytes in the node @@ -143,35 +132,31 @@ // 4 bytes - slot-state bitmap of the node // 0-320 bytes - between 0 and 16 live slots in the node // -// so, in the worst case such a node is 367 bytes, with these parameters. +// So, in the worst case such a node is 367 bytes, with these parameters. // // -// protocol +// Protocol // -------- // -// The protocol is a simple binary command-packet system over TCP; -// each packet consists of a single byte which identifies the protocol -// version, a byte which identifies the command name inside that -// version, a size_t sent as a uleb128 indicating the length of the -// packet, that many bytes of payload, and finally 20 bytes of SHA-1 -// HMAC calculated over the payload. The key for the SHA-1 HMAC is 20 -// bytes of 0 during authentication, and a 20-byte random key chosen -// by the client after authentication (discussed below). -// decoding involves simply buffering until a sufficient number of bytes are -// received, then advancing the buffer pointer. any time an integrity check -// (the HMAC) fails, the protocol is assumed to have lost synchronization, and -// the connection is dropped. the parties are free to drop the tcp stream at -// any point, if too much data is received or too much idle time passes; no -// commitments or transactions are made. +// The protocol is a binary command-packet system over TCP; each packet +// consists of a single byte which identifies the protocol version, a byte +// which identifies the command name inside that version, a size_t sent as +// a uleb128 indicating the length of the packet, that many bytes of +// payload, and finally 20 bytes of SHA-1 HMAC calculated over the payload. +// The key for the SHA-1 HMAC is 20 bytes of 0 during authentication, and a +// 20-byte random key chosen by the client after authentication (discussed +// below). Decoding involves simply buffering until a sufficient number of +// bytes are received, then advancing the buffer pointer. Any time an +// integrity check (the HMAC) fails, the protocol is assumed to have lost +// synchronization, and the connection is dropped. The parties are free to +// drop the TCP stream at any point, if too much data is received or too +// much idle time passes; no commitments or transactions are made. // -// one special command, "bye", is used to shut down a connection -// gracefully. once each side has received all the data they want, they -// can send a "bye" command to the other side. as soon as either side has -// both sent and received a "bye" command, they drop the connection. if -// either side sees an i/o failure (dropped connection) after they have -// sent a "bye" command, they consider the shutdown successful. // -// the exchange begins in a non-authenticated state. the server sends a +// Authentication and setup +// ------------------------ +// +// The exchange begins in a non-authenticated state. The server sends a // "hello " command, which identifies the server's RSA key and // issues a nonce which must be used for a subsequent authentication. // @@ -191,44 +176,81 @@ // key). // // The server then replies with a "confirm" command, which contains no -// other data but will only have the correct HMAC integrity code if -// the server received and properly decrypted the HMAC key offered by -// the client. This transitions the peers into an authenticated state -// and begins refinement. +// other data but will only have the correct HMAC integrity code if the +// server received and properly decrypted the HMAC key offered by the +// client. This transitions the peers into an authenticated state and +// begins epoch refinement. If epoch refinement and epoch transmission +// succeed, the peers switch to data refinement and data transmission. // -// refinement begins with the client sending its root public key and -// manifest certificate merkle nodes to the server. the server then -// compares the root to each slot in *its* root node, and for each slot -// either sends refined subtrees to the client, or (if it detects a missing -// item in one pattern or the other) sends either "data" or "send_data" -// commands corresponding to the role of the missing item (source or -// sink). the client then receives each refined subtree and compares it -// with its own, performing similar description/request behavior depending -// on role, and the cycle continues. // -// detecting the end of refinement is subtle: after sending the refinement -// of the root node, the server sends a "done 0" command (queued behind all -// the other refinement traffic). when either peer receives a "done N" -// command it immediately responds with a "done N+1" command. when two done -// commands for a given merkle tree arrive with no interveining refinements, -// the entire merkle tree is considered complete. +// Refinement +// ---------- // -// once a response is received for each requested key and revision cert -// (either data or nonexistant) the requesting party walks the graph of -// received revision certs and transmits send_data or send_delta commands -// for all the revisions mentionned in the certs which it does not already -// have in its database. +// Refinement is executed by "refiners"; there is a refiner for each +// set of 'items' being exchanged: epochs, keys, certs, and revisions. +// When refinement starts, each party knows only their own set of items; +// when refinement completes, each party has learned of the complete set +// of items in its peer. The self-set and peer-set can then be used to +// calculate the set of items to send during the following transmission +// phase. // -// for each revision it receives, the recipient requests all the file data or -// deltas described in that revision. +// Each refinement phase begins with a tramsission of the root merkle node +// in each refiner. When a refiner receives a node, it compares the +// corresponding node in its tree to the node received. Depending on the +// slot-by-slot comparisons (there are 12 cases, see refiner.cc), the +// refiner will respond with either a sub-node, a note about a leaf, a note +// about a shared subtree, or nothing. // -// once all requested files, manifests, revisions and certs are received (or -// noted as nonexistant), the recipient closes its connection. +// Detecting the end of refinement is a bit subtle: after sending the +// refinement of the root node, a refiner sends a "done 0" command (queued +// behind all the other refinement traffic). When either peer receives a +// "done N" command it immediately responds with a "done N+1" command. When +// two done commands for a given merkle tree arrive with no interveining +// refinements, the entire merkle tree is considered complete. // -// (aside: this protocol is raw binary because coding density is actually -// important here, and each packet consists of very information-dense -// material that you wouldn't have a hope of typing in manually anyways) +// +// Transmission +// ------------ // +// Once the set of items to send has been determined (for keys, certs, and +// revisions) each peer switches into a transmission mode. This mode +// involves walking the revision graph in ancestry-order and sending all +// the items the local peer has which the remote one does not. Since the +// remote and local peers both know all the items which need to be +// transferred (they learned during refinement) they know what to wait for +// and what to send. The mechanisms of the transmission phase (notably, +// enumerator.cc) simply ensure that things are sent in the proper order, +// and without over-filling the output buffer too much. +// +// +// Shutdown +// -------- +// +// After transmission completes, one special command, "bye", is used to +// shut down a connection gracefully. The shutdown sequence based on "bye" +// commands is documented below in session::process_bye_cmd. +// +// +// Note on epochs +// -------------- +// +// One refinement and transmission phase preceeds all the others: epochs. +// Epochs are exchanged and compared in order to be sure that further +// refinement and transmission (on certs and revisions) makes sense; they +// are a sort of "immune system" to prevent incompatible databases (say +// between rebuilds due to bugs in monotone) from cross-contaminating. The +// later refinements are only kicked off *after* all epochs are received +// and compare correctly. +// +// +// Note on dense coding +// -------------------- +// +// This protocol is "raw binary" (non-text) because coding density is +// actually important here, and each packet consists of very +// information-dense material that you wouldn't have a hope of typing in, +// interpreting manually anyways. +// using namespace std; using boost::shared_ptr; @@ -296,7 +318,7 @@ vector written_certs; id saved_nonce; - packet_db_valve dbw; + packet_db_writer dbw; enum { @@ -316,7 +338,7 @@ // Interface to ancestry grovelling. revision_enumerator rev_enumerator; - // enumerator_callbacks methods. + // Enumerator_callbacks methods. bool process_this_rev(revision_id const & rev); bool queue_this_cert(hexenc const & c); void note_file_data(file_id const & f); @@ -774,9 +796,16 @@ if (!epoch_refiner.done()) return; - // But otherwise, we're ready to go! - L(F("all epochs processed, opening database valve\n")); - this->dbw.open_valve(); + // If we ran into an error -- say a mismatched epoch -- don't do any + // further refinements. + if (encountered_error) + return; + + // But otherwise, we're ready to go. Start the next + // set of refinements. + key_refiner.begin_refinement(); + cert_refiner.begin_refinement(); + rev_refiner.begin_refinement(); } void @@ -1271,20 +1300,20 @@ utf8 const & their_include_pattern, utf8 const & their_exclude_pattern) { - // - // internally netsync thinks in terms of sources and sinks. users like + // Internally netsync thinks in terms of sources and sinks. Users like // thinking of repositories as "readonly", "readwrite", or "writeonly". // - // we therefore use the read/write terminology when dealing with the UI: + // We therefore use the read/write terminology when dealing with the UI: // if the user asks to run a "read only" service, this means they are // willing to be a source but not a sink. // - // nb: the "role" here is the role the *client* wants to play + // nb: The "role" here is the role the *client* wants to play // so we need to check that the opposite role is allowed for us, // in our this->role field. // - // client must be a sink and server must be a source (anonymous read-only) + // Client must be a sink and server must be a source (anonymous + // read-only). if (role != sink_role) { @@ -1346,7 +1375,7 @@ get_branches(app, branchnames); globish_matcher their_matcher(their_include_pattern, their_exclude_pattern); - // check that they replied with the nonce we asked for + // Check that they replied with the nonce we asked for. if (!(nonce1 == this->saved_nonce)) { W(F("detected replay attack in auth netcmd\n")); @@ -1354,24 +1383,21 @@ return false; } - - // - // internally netsync thinks in terms of sources and sinks. users like + // Internally netsync thinks in terms of sources and sinks. users like // thinking of repositories as "readonly", "readwrite", or "writeonly". // - // we therefore use the read/write terminology when dealing with the UI: + // We therefore use the read/write terminology when dealing with the UI: // if the user asks to run a "read only" service, this means they are // willing to be a source but not a sink. // - // nb: the "their_role" here is the role the *client* wants to play + // nb: The "their_role" here is the role the *client* wants to play // so we need to check that the opposite role is allowed for us, // in our this->role field. - // if (!app.db.public_key_exists(their_key_hash)) { - // if it's not in the db, it still could be in the keystore if we - // have the private key that goes with it + // If it's not in the db, it still could be in the keystore if we + // have the private key that goes with it. if (!app.keys.try_ensure_in_db(their_key_hash)) { W(F("remote public key hash '%s' is unknown\n") % their_key_hash); @@ -1380,12 +1406,12 @@ } } - // get their public key + // Get their public key. rsa_keypair_id their_id; base64 their_key; app.db.get_pubkey(their_key_hash, their_id, their_key); - // client as sink, server as source (reading) + // Client as sink, server as source (reading). if (their_role == sink_role || their_role == source_and_sink_role) { @@ -1415,12 +1441,12 @@ } } - //if we're source_and_sink_role, continue even with no branches readable - //ex: serve --db=empty.db + // If we're source_and_sink_role, continue even with no branches readable + // eg. serve --db=empty.db P(F("allowed '%s' read permission for '%s' excluding '%s'\n") % their_id % their_include_pattern % their_exclude_pattern); - // client as source, server as sink (writing) + // Client as source, server as sink (writing). if (their_role == source_role || their_role == source_and_sink_role) { @@ -1446,28 +1472,31 @@ rebuild_merkle_trees(app, ok_branches); - // save their identity + // Save their identity. this->remote_peer_key_hash = client; - // check the signature + // Check the signature. base64 sig; encode_base64(rsa_sha1_signature(signature), sig); if (check_signature(app, their_id, their_key, nonce1(), sig)) { - // get our private key and sign back + // Get our private key and sign back. L(F("client signature OK, accepting authentication\n")); this->authenticated = true; this->remote_peer_key_name = their_id; - // assume the (possibly degraded) opposite role + + // Assume the (possibly degraded) opposite role. switch (their_role) { case source_role: I(this->role != source_role); this->role = sink_role; break; + case source_and_sink_role: I(this->role == source_and_sink_role); break; + case sink_role: I(this->role != sink_role); this->role = source_role; @@ -1491,16 +1520,16 @@ hexenc their_key_hash; encode_hexenc(id(remote_peer_key_hash), their_key_hash); - // nb. this->role is our role, the server is in the opposite role + // nb. this->role is our role, the server is in the opposite role. L(F("received 'confirm' netcmd from server '%s' for pattern '%s' exclude '%s' in %s mode\n") % their_key_hash % our_include_pattern % our_exclude_pattern % (this->role == source_and_sink_role ? _("source and sink") : (this->role == source_role ? _("sink") : _("source")))); - // check their signature + // Check their signature. if (app.db.public_key_exists(their_key_hash)) { - // get their public key and check the signature + // Get their public key and check the signature. rsa_keypair_id their_id; base64 their_key; app.db.get_pubkey(their_key_hash, their_id, their_key); @@ -1730,9 +1759,6 @@ session::respond_to_confirm_cmd() { epoch_refiner.begin_refinement(); - key_refiner.begin_refinement(); - cert_refiner.begin_refinement(); - rev_refiner.begin_refinement(); } static bool @@ -1851,8 +1877,6 @@ hexenc hitem; encode_hexenc(item, hitem); - // it's ok if we received something we didn't ask for; it might - // be a spontaneous transmission from refinement note_item_arrived(type, item); switch (type) @@ -1881,11 +1905,12 @@ else { L(F("branch %s already has an epoch; checking\n") % branch); - // if we get here, then we know that the epoch must be + // If we get here, then we know that the epoch must be // different, because if it were the same then the - // if(epoch_exists()) branch up above would have been taken. if - // somehow this is wrong, then we have broken epoch hashing or - // something, which is very dangerous, so play it safe... + // if (epoch_exists()) branch up above would have been taken. + // if somehow this is wrong, then we have broken epoch + // hashing or something, which is very dangerous, so play it + // safe... I(!(i->second == epoch)); // It is safe to call 'error' here, because if we get here, @@ -2219,13 +2244,13 @@ break; case usher_reply_cmd: - return false;// should not happen + return false; // Should not happen. break; } return false; } -// this kicks off the whole cascade starting from "hello" +// This kicks off the whole cascade starting from "hello". void session::begin_service() { @@ -2262,8 +2287,9 @@ { if (!armed) { + // Don't pack the buffer unnecessarily. if (outbuf_size > constants::bufsz * 10) - return false; // don't pack the buffer unnecessarily + return false; if (cmd.read(inbuf, read_hmac)) { @@ -2283,20 +2309,28 @@ return true; transaction_guard guard(app.db); + armed = false; - L(F("processing %d byte input buffer from peer %s\n") % inbuf.size() % peer_id); + L(F("processing %d byte input buffer from peer %s\n") + % inbuf.size() % peer_id); + bool ret = dispatch_payload(cmd); + if (inbuf.size() >= constants::netcmd_maxsz) - W(F("input buffer for peer %s is overfull after netcmd dispatch\n") % peer_id); + W(F("input buffer for peer %s is overfull " + "after netcmd dispatch\n") % peer_id); + guard.commit(); if (!ret) - L(F("finishing processing with '%d' packet") % cmd.get_cmd_code()); + L(F("finishing processing with '%d' packet") + % cmd.get_cmd_code()); return ret; } catch (bad_decode & bd) { - W(F("protocol error while processing peer %s: '%s'\n") % peer_id % bd.what); + W(F("protocol error while processing peer %s: '%s'\n") + % peer_id % bd.what); return false; } catch (netsync_error & err) @@ -2304,7 +2338,7 @@ W(F("error: %s\n") % err.msg); queue_error_cmd(err.msg); encountered_error = true; - return true; // don't terminate until we've send the error_cmd + return true; // Don't terminate until we've send the error_cmd. } } @@ -2586,12 +2620,12 @@ reap_dead_sessions(map > & sessions, unsigned long timeout_seconds) { - // kill any clients which haven't done any i/o inside the timeout period - // or who have exchanged all items and flushed their output buffers + // Kill any clients which haven't done any i/o inside the timeout period + // or who have exchanged all items and flushed their output buffers. set dead_clients; time_t now = ::time(NULL); - for (map >::const_iterator i = sessions.begin(); - i != sessions.end(); ++i) + for (map >::const_iterator + i = sessions.begin(); i != sessions.end(); ++i) { if (static_cast(i->second->last_io_time + timeout_seconds) < static_cast(now)) @@ -2637,7 +2671,8 @@ Netxx::StreamServer server(addr, timeout); const char *name = addr.get_name(); P(F("beginning service on %s : %s\n") - % (name != NULL ? name : "all interfaces") % lexical_cast(addr.get_port())); + % (name != NULL ? name : "all interfaces") + % lexical_cast(addr.get_port())); map > sessions; set armed_sessions; @@ -2648,7 +2683,8 @@ armed_sessions.clear(); if (sessions.size() >= session_limit) - W(F("session limit %d reached, some connections will be refused\n") % session_limit); + W(F("session limit %d reached, some connections " + "will be refused\n") % session_limit); else probe.add(server); @@ -2671,7 +2707,8 @@ // we either got a new connection else if (fd == server) handle_new_connection(addr, server, timeout, role, - include_pattern, exclude_pattern, sessions, app); + include_pattern, exclude_pattern, + sessions, app); // or an existing session woke up else @@ -2688,7 +2725,8 @@ bool live_p = true; if (event & Netxx::Probe::ready_read) - handle_read_available(fd, sess, sessions, armed_sessions, live_p); + handle_read_available(fd, sess, sessions, + armed_sessions, live_p); if (live_p && (event & Netxx::Probe::ready_write)) handle_write_available(fd, sess, sessions, live_p); @@ -2707,13 +2745,6 @@ } -///////////////////////////////////////////////// -// -// layer 4: monotone interface layer -// -///////////////////////////////////////////////// - - void insert_with_parents(revision_id rev, refiner & ref, @@ -2773,7 +2804,7 @@ { if(branchnames.find(names[i]) != branchnames.end()) { - // branch matches, get its certs + // Branch matches, get its certs. vector< revision > certs; base64 encoded_name; encode_base64(cert_value(names[i]),encoded_name); @@ -2783,7 +2814,7 @@ { insert_with_parents(revision_id(j->inner().ident), rev_refiner, revision_ids, app, revisions_ticker); - // branch certs go in here, others later on + // Granch certs go in here, others later on. hexenc tmp; id item; cert_hash_code(j->inner(), tmp); @@ -2807,14 +2838,16 @@ cert_value branch((*i)()); std::map::const_iterator j; j = epochs.find(branch); - // set to zero any epoch which is not yet set + + // Set to zero any epoch which is not yet set. if (j == epochs.end()) { L(F("setting epoch on %s to zero\n") % branch); epochs.insert(std::make_pair(branch, epoch_zero)); app.db.set_epoch(branch, epoch_zero); } - // then insert all epochs into merkle tree + + // Then insert all epochs into merkle tree. j = epochs.find(branch); I(j != epochs.end()); epoch_id eid; @@ -2832,8 +2865,9 @@ cert_idx idx; app.db.get_revision_cert_nobranch_index(idx); - // insert all non-branch certs reachable via these revisions - // (branch certs were inserted earlier) + // Insert all non-branch certs reachable via these revisions + // (branch certs were inserted earlier). + for (cert_idx::const_iterator i = idx.begin(); i != idx.end(); ++i) { hexenc const & hash = i->first; @@ -2852,8 +2886,9 @@ } } - // add any keys specified on the command line - for (vector::const_iterator key = app.keys_to_push.begin(); + // Add any keys specified on the command line. + for (vector::const_iterator key + = app.keys_to_push.begin(); key != app.keys_to_push.end(); ++key) { if (inserted_keys.find(*key) == inserted_keys.end()) @@ -2868,7 +2903,8 @@ inserted_keys.insert(*key); } } - // insert all the keys + + // Insert all the keys. for (set::const_iterator key = inserted_keys.begin(); key != inserted_keys.end(); key++) { @@ -2928,4 +2964,3 @@ throw oops((F("network error: %s") % e.what()).str());; } } - ============================================================ --- packet.cc 6c8855a5184c4abb4485e315e3e28e848f5bb564 +++ packet.cc 50c3d69fe2429f1e334a8d2dc675602d9e4d5400 @@ -6,10 +6,8 @@ #include #include -#include #include #include -#include #include "app_state.hh" #include "cset.hh" @@ -26,220 +24,6 @@ using boost::match_results; using boost::regex; -// --- packet db writer -- -// -// FIXME: this comment is out of date, and untrustworthy. -// -// the packet_db_writer::impl class (see below) manages writes to the -// database. it also ensures that those writes follow the semantic -// dependencies implied by the objects being written. -// -// an incoming manifest delta has three states: -// -// when it is first received, it is (probably) "non-constructable". -// this means that we do not have a way of building its preimage from either -// the database or from the memory cache of deltas we keep in this class -// -// a non-constructable manifest delta is given a prerequisite of -// constructibility on its preimage. -// -// when the preimage becomes constructable, the manifest delta (probably) -// changes to "non-writable" state. this means that we have a way to build -// the manifest, we haven't received all the files which depend on it yet, -// so we won't write it to the database. -// -// when a manifest becomes constructable (but not necessarily writable) we -// call an analyzer back, if we have one, with the pre- and post-states of -// the delta. this happens in order to give the netsync layer a chance to -// request all the file deltas which accompany the manifest delta. -// -// a non-writable manifest delta is given prerequisites on all its -// non-existing underlying files, and delayed again. -// -// when all the files arrive, a non-writable manifest is written to the -// database. -// -// files are delayed to depend on their preimage, like non-constructable -// manifests. however, once they are constructable they are immediately -// written to the database. -// -///////////////////////////////////////////////////////////////// -// -// how it's done: -// -// each manifest or file has a companion class called a "prerequisite". a -// prerequisite has a set of delayed packets which depend on it. these -// delayed packets are also called dependents. a prerequisite can either be -// "unsatisfied" or "satisfied". when it is first constructed, it is -// unsatisfied. when it is satisfied, it calls all its dependents to inform -// them that it has become satisfied. -// -// when all the prerequisites of a given dependent is satisfied, the -// dependent writes itself back to the db writer. the dependent is then -// dead, and the prerequisite will forget about it. -// -// dependents' lifetimes are managed by prerequisites. when all -// prerequisites forget about their dependents, the dependent is destroyed -// (it is reference counted with a shared pointer). similarly, the -// packet_db_writer::impl holds references to prerequisites, and when -// a prerequisite no longer has any dependents, it is dropped from the -// packet_db_writer::impl, destroying it. -// -///////////////////////////////////////////////////////////////// -// -// this same machinery is also re-used for the "valved" packet writer, as a -// convenient way to queue up commands in memory while the valve is closed. -// in this usage, we simply never add any prerequisites to any packet, and -// just call apply_delayed_packet when the valve opens. - -struct -delayed_packet -{ - virtual void apply_delayed_packet(packet_db_writer & pw) = 0; - virtual ~delayed_packet() {} -}; - -// concrete delayed packets - -class -delayed_revision_data_packet - : public delayed_packet -{ - revision_id ident; - revision_data dat; -public: - delayed_revision_data_packet(revision_id const & i, - revision_data const & md) - : ident(i), dat(md) - {} - virtual void apply_delayed_packet(packet_db_writer & pw); -}; - - -class -delayed_file_data_packet - : public delayed_packet -{ - file_id ident; - file_data dat; -public: - delayed_file_data_packet(file_id const & i, - file_data const & fd) - : ident(i), dat(fd) - {} - virtual void apply_delayed_packet(packet_db_writer & pw); -}; - -class -delayed_file_delta_packet - : public delayed_packet -{ - file_id old_id; - file_id new_id; - file_delta del; - bool forward_delta; - bool write_full; -public: - delayed_file_delta_packet(file_id const & oi, - file_id const & ni, - file_delta const & md, - bool fwd, - bool full = false) - : old_id(oi), new_id(ni), del(md), forward_delta(fwd), write_full(full) - {} - virtual void apply_delayed_packet(packet_db_writer & pw); -}; - - -class -delayed_revision_cert_packet - : public delayed_packet -{ - revision c; -public: - delayed_revision_cert_packet(revision const & c) - : c(c) - {} - virtual void apply_delayed_packet(packet_db_writer & pw); -}; - -class -delayed_public_key_packet - : public delayed_packet -{ - rsa_keypair_id id; - base64 key; -public: - delayed_public_key_packet(rsa_keypair_id const & id, - base64 key) - : id(id), key(key) - {} - virtual void apply_delayed_packet(packet_db_writer & pw); -}; - -class -delayed_keypair_packet - : public delayed_packet -{ - rsa_keypair_id id; - keypair kp; -public: - delayed_keypair_packet(rsa_keypair_id const & id, - keypair const & kp) - : id(id), kp(kp) - {} - virtual void apply_delayed_packet(packet_db_writer & pw); -}; - -void -delayed_revision_data_packet::apply_delayed_packet(packet_db_writer & pw) -{ - L(F("writing delayed revision data packet for %s\n") % ident); - pw.consume_revision_data(ident, dat); -} - -void -delayed_file_data_packet::apply_delayed_packet(packet_db_writer & pw) -{ - L(F("writing delayed file data packet for %s\n") % ident); - pw.consume_file_data(ident, dat); -} - -void -delayed_file_delta_packet::apply_delayed_packet(packet_db_writer & pw) -{ - L(F("writing delayed file %s packet for %s -> %s%s\n") - % (forward_delta ? "delta" : "reverse delta") - % (forward_delta ? old_id : new_id) - % (forward_delta ? new_id : old_id) - % (write_full ? " (writing in full)" : "")); - if (forward_delta) - pw.consume_file_delta(old_id, new_id, del, write_full); - else - pw.consume_file_reverse_delta(new_id, old_id, del); -} - -void -delayed_revision_cert_packet::apply_delayed_packet(packet_db_writer & pw) -{ - L(F("writing delayed revision cert on %s\n") % c.inner().ident); - pw.consume_revision_cert(c); -} - -void -delayed_public_key_packet::apply_delayed_packet(packet_db_writer & pw) -{ - L(F("writing delayed public key %s\n") % id()); - pw.consume_public_key(id, key); -} - -void -delayed_keypair_packet::apply_delayed_packet(packet_db_writer & pw) -{ - L(F("writing delayed private key %s\n") % id()); - pw.consume_key_pair(id, kp); -} - void packet_consumer::set_on_revision_written(boost::function1 const & x) @@ -524,147 +308,6 @@ guard.commit(); } - -// --- valved packet writer --- - -struct packet_db_valve::impl -{ - packet_db_writer writer; - std::vector< boost::shared_ptr > packets; - bool valve_is_open; - impl(app_state & app, bool take_keys) - : writer(app, take_keys), - valve_is_open(false) - {} - void do_packet(boost::shared_ptr packet) - { - if (valve_is_open) - packet->apply_delayed_packet(writer); - else - packets.push_back(packet); - } -}; - -packet_db_valve::packet_db_valve(app_state & app, bool take_keys) - : pimpl(new impl(app, take_keys)) -{} - -packet_db_valve::~packet_db_valve() -{} - -void -packet_db_valve::open_valve() -{ - L(F("packet valve opened\n")); - pimpl->valve_is_open = true; - int written = 0; - for (std::vector< boost::shared_ptr >::reverse_iterator - i = pimpl->packets.rbegin(); - i != pimpl->packets.rend(); - ++i) - { - pimpl->do_packet(*i); - ++written; - } - pimpl->packets.clear(); - L(F("wrote %i queued packets\n") % written); -} - -#define DOIT(x) pimpl->do_packet(boost::shared_ptr(new x)); - -void -packet_db_valve::set_on_revision_written(boost::function1 const & x) -{ - on_revision_written=x; - pimpl->writer.set_on_revision_written(x); -} - -void -packet_db_valve::set_on_cert_written(boost::function1 const & x) -{ - on_cert_written=x; - pimpl->writer.set_on_cert_written(x); -} - -void -packet_db_valve::set_on_pubkey_written(boost::function1 - const & x) -{ - on_pubkey_written=x; - pimpl->writer.set_on_pubkey_written(x); -} - -void -packet_db_valve::set_on_keypair_written(boost::function1 - const & x) -{ - on_keypair_written=x; - pimpl->writer.set_on_keypair_written(x); -} - -void -packet_db_valve::consume_file_data(file_id const & ident, - file_data const & dat) -{ - DOIT(delayed_file_data_packet(ident, dat)); -} - -void -packet_db_valve::consume_file_delta(file_id const & id_old, - file_id const & id_new, - file_delta const & del) -{ - DOIT(delayed_file_delta_packet(id_old, id_new, del, true)); -} - -void -packet_db_valve::consume_file_delta(file_id const & id_old, - file_id const & id_new, - file_delta const & del, - bool write_full) -{ - DOIT(delayed_file_delta_packet(id_old, id_new, del, true, write_full)); -} - -void -packet_db_valve::consume_file_reverse_delta(file_id const & id_new, - file_id const & id_old, - file_delta const & del) -{ - DOIT(delayed_file_delta_packet(id_old, id_new, del, false)); -} - -void -packet_db_valve::consume_revision_data(revision_id const & ident, - revision_data const & dat) -{ - DOIT(delayed_revision_data_packet(ident, dat)); -} - -void -packet_db_valve::consume_revision_cert(revision const & t) -{ - DOIT(delayed_revision_cert_packet(t)); -} - -void -packet_db_valve::consume_public_key(rsa_keypair_id const & ident, - base64< rsa_pub_key > const & k) -{ - DOIT(delayed_public_key_packet(ident, k)); -} - -void -packet_db_valve::consume_key_pair(rsa_keypair_id const & ident, - keypair const & kp) -{ - DOIT(delayed_keypair_packet(ident, kp)); -} - -#undef DOIT - // --- packet writer --- packet_writer::packet_writer(ostream & o) : ost(o) {} ============================================================ --- packet.hh 41eff9ec50dfb14b240cfbf962d95c84bf5474da +++ packet.hh 35b466b5467cb8b2cb27d768b6217dc21a10977f @@ -133,52 +133,6 @@ keypair const & kp); }; -// this writer is just like packet_db_writer, except that none of your calls -// have any effect until after calling open_valve; at that point, all previous -// consume_* calls will suddenly take effect. - -struct packet_db_valve : public packet_consumer -{ -private: - struct impl; - std::auto_ptr pimpl; -public: - packet_db_valve(app_state & app, - bool take_keys = false); - virtual ~packet_db_valve(); - virtual void set_on_revision_written(boost::function1 - const & x); - virtual void set_on_cert_written(boost::function1 - const & x); - virtual void set_on_pubkey_written(boost::function1 - const & x); - virtual void set_on_keypair_written(boost::function1 - const & x); - virtual void consume_file_data(file_id const & ident, - file_data const & dat); - virtual void consume_file_delta(file_id const & id_old, - file_id const & id_new, - file_delta const & del); - virtual void consume_file_delta(file_id const & id_old, - file_id const & id_new, - file_delta const & del, - bool write_full); - virtual void consume_file_reverse_delta(file_id const & id_new, - file_id const & id_old, - file_delta const & del); - - virtual void consume_revision_data(revision_id const & ident, - revision_data const & dat); - virtual void consume_revision_cert(revision const & t); - - virtual void consume_public_key(rsa_keypair_id const & ident, - base64< rsa_pub_key > const & k); - virtual void consume_key_pair(rsa_keypair_id const & ident, - keypair const & kp); - - virtual void open_valve(); -}; - size_t read_packets(std::istream & in, packet_consumer & cons); #endif