circle-discuss
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[circle] automated testing


From: malcolm handley
Subject: [circle] automated testing
Date: Thu, 24 Jun 2004 00:10:30 -0700

I decided to test circle in an isolated place before trying to embed it in a larger program that currently uses freenet for distributing data and wrote the test below.

I am thrilled with how easy it was to write both of these tests and how short they are.

I have two questions, though:

1. Do you guys mind if I run code like this that makes nodes (which will not use proxies, regardless of whether they are running on a machine with NAT), connects them to the network and then destroys them very soon after? Is that bad for the network overall?

2. The second test is a little hacky. I don't know how long I should wait (the for "for x in range(1000)" part) for the publication of the data to spread as far as it needs to. (In fact, I don't have much idea how publication works.) Is there some way that I can determine how long to wait? Is there something that I can read about how this works? I couldn't find much on the web site.

Anyway, here the test is, in case anyone cares. I thought that you might like to hear good news, rather than bugs, from me for once. To run it place the whole thing in a file called nodetest.py in circlelib and run "python nodetest.py -v".

>>>> begin nodetest.py
from __future__ import generators

import time
import unittest

import hash
import node
import proxy
import utility

class Test(unittest.TestCase):
    def makeConnectedNodes(self, numNodes):
        nodes = [node.Node() for x in range(numNodes)]
        for n in nodes: n.start(proxy=None)
        connected = 0
        while not connected:
            utility._daemon_action_timeout()
            connected = 1
            for n in nodes:
                if not n.is_connected(): connected = 0
        return nodes

    def testRpc(self):
        n0, n1 = self.makeConnectedNodes(numNodes=2)

        class Handler:
            def handle(self, request, address, callId):
                return "rpc handled"
        n0.add_handler(msg_name="nodetest", how=Handler())

        finishedDict = {"finished":0}
        def callMethod():
ticket, template, wait = n1.call(address=n0.address, query=("nodetest",))
            if wait: yield "call", (n1, ticket)
            result = n1.get_reply(ticket,template)
            self.assertEquals(result, "rpc handled")
            finishedDict["finished"] = 1
        utility.start_thread(callMethod())
        while not finishedDict["finished"]:
            utility._daemon_action_timeout()

    def testData(self):
        n0, n1 = self.makeConnectedNodes(numNodes=2)

        publishedName = "testData-%f" % time.time()
        publishedData = {"type":"test-data", "attr":"value"}

        n0.publish(name=hash.hash_of(publishedName), info=publishedData)

        for x in range(1000):
            time.sleep(0.01)
            utility._daemon_action_timeout()

        pipe = n0.retrieve(hash.hash_of(publishedName))
        foundAddress = None
        try:
            while not pipe.finished():
                for address, data in pipe.read_all():
                    assert not foundAddress
                    self.assertEquals(data, publishedData)
                    foundAddress = address
                utility._daemon_action_timeout()
                time.sleep(1)
        finally:
            pipe.stop()
        self.assertEquals(foundAddress, n0.address)

if __name__ == "__main__":
    unittest.main()
<<<< end nodetest.py





reply via email to

[Prev in Thread] Current Thread [Next in Thread]