[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Commit-gnuradio] [gnuradio] 01/04: controlport: cleanup and thread safe
From: |
git |
Subject: |
[Commit-gnuradio] [gnuradio] 01/04: controlport: cleanup and thread safety. |
Date: |
Thu, 16 Apr 2015 15:08:09 +0000 (UTC) |
This is an automated email from the git hooks/post-receive script.
jcorgan pushed a commit to branch master
in repository gnuradio.
commit e9d9cf5aced8ed80bc71d817c114866babef3942
Author: Nate Goergen <address@hidden>
Date: Thu Apr 16 10:26:49 2015 -0400
controlport: cleanup and thread safety.
Cleaned up some of the templates for only the necessary stuff.
Adds mutex lock protections around getters and setters.
---
gnuradio-runtime/include/gnuradio/rpcbufferedget.h | 3 +-
.../include/gnuradio/rpcserver_booter_thrift.h | 3 +-
.../include/gnuradio/rpcserver_thrift.h | 3 +
.../include/gnuradio/thrift_application_base.h | 7 +-
.../include/gnuradio/thrift_server_template.h | 87 ++++++++++------------
.../controlport/thrift/rpcserver_booter_thrift.cc | 14 ++--
.../lib/controlport/thrift/rpcserver_thrift.cc | 8 ++
7 files changed, 62 insertions(+), 63 deletions(-)
diff --git a/gnuradio-runtime/include/gnuradio/rpcbufferedget.h
b/gnuradio-runtime/include/gnuradio/rpcbufferedget.h
index ebd740b..ad05551 100644
--- a/gnuradio-runtime/include/gnuradio/rpcbufferedget.h
+++ b/gnuradio-runtime/include/gnuradio/rpcbufferedget.h
@@ -25,7 +25,6 @@
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/mutex.hpp>
-#include <stdio.h>
template<typename TdataType>
class rpcbufferedget {
@@ -49,8 +48,8 @@ public:
}
TdataType get() {
- d_data_needed = true;
boost::mutex::scoped_lock lock(d_buffer_lock);
+ d_data_needed = true;
d_data_ready.wait(lock);
return d_buffer;
}
diff --git a/gnuradio-runtime/include/gnuradio/rpcserver_booter_thrift.h
b/gnuradio-runtime/include/gnuradio/rpcserver_booter_thrift.h
index 28900a4..fd1da09 100644
--- a/gnuradio-runtime/include/gnuradio/rpcserver_booter_thrift.h
+++ b/gnuradio-runtime/include/gnuradio/rpcserver_booter_thrift.h
@@ -34,8 +34,7 @@ class rpcserver_booter_thrift
: public virtual rpcserver_booter_base,
public virtual thrift_server_template<rpcserver_base,
rpcserver_thrift,
- rpcserver_booter_thrift,
-
boost::shared_ptr<GNURadio::ControlPortIf> >
+ rpcserver_booter_thrift>
{
public:
rpcserver_booter_thrift();
diff --git a/gnuradio-runtime/include/gnuradio/rpcserver_thrift.h
b/gnuradio-runtime/include/gnuradio/rpcserver_thrift.h
index 027a9ea..203be66 100644
--- a/gnuradio-runtime/include/gnuradio/rpcserver_thrift.h
+++ b/gnuradio-runtime/include/gnuradio/rpcserver_thrift.h
@@ -32,6 +32,7 @@
#include "thrift/ControlPort.h"
#include "thrift/gnuradio_types.h"
#include <boost/format.hpp>
+#include <boost/thread/mutex.hpp>
#define S(x) #x
#define S_(x) S(x)
@@ -61,6 +62,8 @@ public:
virtual void shutdown();
private:
+ boost::mutex d_callback_map_lock;
+
typedef std::map<std::string, configureCallback_t> ConfigureCallbackMap_t;
ConfigureCallbackMap_t d_setcallbackmap;
diff --git a/gnuradio-runtime/include/gnuradio/thrift_application_base.h
b/gnuradio-runtime/include/gnuradio/thrift_application_base.h
index aa50c55..4af5e88 100644
--- a/gnuradio-runtime/include/gnuradio/thrift_application_base.h
+++ b/gnuradio-runtime/include/gnuradio/thrift_application_base.h
@@ -28,6 +28,7 @@
#include <gnuradio/prefs.h>
#include <gnuradio/thread/thread.h>
#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/scoped_ptr.hpp>
namespace {
// Time, in milliseconds, to wait between checks to the Thrift runtime to
see if
@@ -83,7 +84,7 @@ public:
/*!
* Destructor for the application. Since shutdown and cleanup of the
* runtime is typically custom to a particular booter
- * implementation, this must be implemented as a specalized function
+ * implementation, this must be implemented as a specialized function
* for a particular booter. Thus a template implementation is not
* provided here.
*/
@@ -118,7 +119,7 @@ protected:
/*!
* Reference to the Thrift runtime.
*/
- std::auto_ptr<apache::thrift::server::TServer> d_thriftserver;
+ boost::scoped_ptr<apache::thrift::server::TServer> d_thriftserver;
/*!
* Max number of attempts when checking the Thrift runtime for
@@ -175,7 +176,7 @@ private:
// Pointer to the structure containing staticly allocated
// state information for the applicaiton_base singleton.
- static std::auto_ptr<thrift_application_base_impl > p_impl;
+ static boost::scoped_ptr<thrift_application_base_impl > p_impl;
// Mutex to protect the endpoint string.
gr::thread::mutex d_lock;
diff --git a/gnuradio-runtime/include/gnuradio/thrift_server_template.h
b/gnuradio-runtime/include/gnuradio/thrift_server_template.h
index 1e9059d..b8f5448 100644
--- a/gnuradio-runtime/include/gnuradio/thrift_server_template.h
+++ b/gnuradio-runtime/include/gnuradio/thrift_server_template.h
@@ -39,7 +39,7 @@
using namespace apache;
-template<typename TserverBase, typename TserverClass, typename TImplClass,
typename TThriftClass>
+template<typename TserverBase, typename TserverClass, typename TImplClass>
class thrift_server_template : public thrift_application_base<TserverBase,
TImplClass>
{
public:
@@ -50,9 +50,12 @@ protected:
TserverBase* i_impl();
friend class thrift_application_base<TserverBase, TImplClass>;
- TserverBase* d_server;
-
private:
+ boost::shared_ptr<TserverClass> d_handler;
+ boost::shared_ptr<thrift::TProcessor> d_processor;
+ boost::shared_ptr<thrift::transport::TServerTransport> d_serverTransport;
+ boost::shared_ptr<thrift::transport::TTransportFactory> d_transportFactory;
+ boost::shared_ptr<thrift::protocol::TProtocolFactory> d_protocolFactory;
/**
* Custom TransportFactory that allows you to override the default Thrift
buffer size
* of 512 bytes.
@@ -76,9 +79,14 @@ private:
};
};
-template<typename TserverBase, typename TserverClass, typename TImplClass,
typename TThriftClass>
-thrift_server_template<TserverBase, TserverClass, TImplClass,
TThriftClass>::thrift_server_template
-(TImplClass* _this) : thrift_application_base<TserverBase, TImplClass>(_this)
+template<typename TserverBase, typename TserverClass, typename TImplClass>
+thrift_server_template<TserverBase, TserverClass,
TImplClass>::thrift_server_template
+(TImplClass* _this) : thrift_application_base<TserverBase, TImplClass>(_this),
+d_handler(new TserverClass()),
+d_processor(new GNURadio::ControlPortProcessor(d_handler)),
+d_serverTransport(),
+d_transportFactory(),
+d_protocolFactory(new thrift::protocol::TBinaryProtocolFactory())
{
gr::logger_ptr logger, debug_logger;
gr::configure_default_loggers(logger, debug_logger, "controlport");
@@ -87,74 +95,59 @@ thrift_server_template<TserverBase, TserverClass,
TImplClass, TThriftClass>::thr
std::string thrift_config_file =
gr::prefs::singleton()->get_string("ControlPort", "config", "");
if(thrift_config_file.length() > 0) {
- gr::prefs::singleton()->add_config_file(thrift_config_file);
+ gr::prefs::singleton()->add_config_file(thrift_config_file);
}
// Collect configuration options from the Thrift config file;
// defaults if the config file doesn't exist or list the specific
// options.
port = static_cast<unsigned int>(gr::prefs::singleton()->get_long("thrift",
"port",
- thrift_application_base<TserverBase, TImplClass>::d_default_thrift_port));
+
thrift_application_base<TserverBase, TImplClass>::d_default_thrift_port));
nthreads = static_cast<unsigned
int>(gr::prefs::singleton()->get_long("thrift", "nthreads",
- thrift_application_base<TserverBase,
TImplClass>::d_default_num_thrift_threads));
+
thrift_application_base<TserverBase,
TImplClass>::d_default_num_thrift_threads));
buffersize = static_cast<unsigned
int>(gr::prefs::singleton()->get_long("thrift", "buffersize",
- thrift_application_base<TserverBase,
TImplClass>::d_default_thrift_buffer_size));
-
- boost::shared_ptr<TserverClass> handler(new TserverClass());
-
- boost::shared_ptr<thrift::TProcessor>
- processor(new GNURadio::ControlPortProcessor(handler));
+
thrift_application_base<TserverBase,
TImplClass>::d_default_thrift_buffer_size));
- boost::shared_ptr<thrift::transport::TServerTransport>
- serverTransport(new thrift::transport::TServerSocket(port));
+ d_serverTransport.reset(new thrift::transport::TServerSocket(port));
- boost::shared_ptr<thrift::transport::TTransportFactory>
- transportFactory(new
thrift_server_template::TBufferedTransportFactory(buffersize));
-
- boost::shared_ptr<thrift::protocol::TProtocolFactory>
- protocolFactory(new thrift::protocol::TBinaryProtocolFactory());
+ d_transportFactory.reset(new
thrift_server_template::TBufferedTransportFactory(buffersize));
if(nthreads <= 1) {
- // "Thrift: Single-threaded server"
- //std::cout << "Thrift Single-threaded server" << std::endl;
- thrift_application_base<TserverBase, TImplClass>::d_thriftserver.reset(
- new thrift::server::TSimpleServer(processor, serverTransport,
- transportFactory, protocolFactory));
+ // "Thrift: Single-threaded server"
+ //std::cout << "Thrift Single-threaded server" << std::endl;
+ thrift_application_base<TserverBase, TImplClass>::d_thriftserver.reset(
+ new thrift::server::TSimpleServer(d_processor, d_serverTransport,
d_transportFactory, d_protocolFactory));
}
else {
- //std::cout << "Thrift Multi-threaded server : " << nthreads << std::endl;
- boost::shared_ptr<thrift::concurrency::ThreadManager> threadManager
- (thrift::concurrency::ThreadManager::newSimpleThreadManager(nthreads));
-
- boost::shared_ptr<thrift::concurrency::PlatformThreadFactory> threadFactory
- (boost::shared_ptr<thrift::concurrency::PlatformThreadFactory>
- (new thrift::concurrency::PlatformThreadFactory()));
+ //std::cout << "Thrift Multi-threaded server : " << d_nthreads <<
std::endl;
+ boost::shared_ptr<thrift::concurrency::ThreadManager> threadManager(
+
thrift::concurrency::ThreadManager::newSimpleThreadManager(nthreads));
- threadManager->threadFactory(threadFactory);
+ threadManager->threadFactory(
+ boost::shared_ptr<thrift::concurrency::PlatformThreadFactory>(
+ new thrift::concurrency::PlatformThreadFactory()));
- threadManager->start();
+ threadManager->start();
- thrift_application_base<TserverBase, TImplClass>::d_thriftserver.reset(
- new thrift::server::TThreadPoolServer(processor, serverTransport,
- transportFactory, protocolFactory,
- threadManager));
+ thrift_application_base<TserverBase, TImplClass>::d_thriftserver.reset(
+ new thrift::server::TThreadPoolServer(d_processor, d_serverTransport,
+ d_transportFactory,
d_protocolFactory,
+ threadManager));
}
-
- d_server = handler.get();
}
-template<typename TserverBase, typename TserverClass, typename TImplClass,
typename TThriftClass>
-thrift_server_template<TserverBase, TserverClass,TImplClass,
TThriftClass>::~thrift_server_template()
+template<typename TserverBase, typename TserverClass, typename TImplClass>
+thrift_server_template<TserverBase,
TserverClass,TImplClass>::~thrift_server_template()
{
}
-template<typename TserverBase, typename TserverClass, typename TImplClass,
typename TThriftClass>
-TserverBase* thrift_server_template<TserverBase, TserverClass, TImplClass,
TThriftClass>::i_impl()
+template<typename TserverBase, typename TserverClass, typename TImplClass>
+TserverBase* thrift_server_template<TserverBase, TserverClass,
TImplClass>::i_impl()
{
//std::cerr << "thrift_server_template: i_impl" << std::endl;
- return d_server;
+ return d_handler.get();
}
#endif /* THRIFT_SERVER_TEMPLATE_H */
diff --git a/gnuradio-runtime/lib/controlport/thrift/rpcserver_booter_thrift.cc
b/gnuradio-runtime/lib/controlport/thrift/rpcserver_booter_thrift.cc
index 40cfe1a..1d6cafe 100644
--- a/gnuradio-runtime/lib/controlport/thrift/rpcserver_booter_thrift.cc
+++ b/gnuradio-runtime/lib/controlport/thrift/rpcserver_booter_thrift.cc
@@ -42,8 +42,7 @@ namespace {
rpcserver_booter_thrift::rpcserver_booter_thrift() :
thrift_server_template<rpcserver_base,
rpcserver_thrift,
- rpcserver_booter_thrift,
- boost::shared_ptr<GNURadio::ControlPortIf> >(this),
+ rpcserver_booter_thrift>(this),
d_type(std::string(CONTROL_PORT_CLASS))
{;}
@@ -54,8 +53,7 @@ rpcserver_base*
rpcserver_booter_thrift::i()
{
return thrift_server_template<rpcserver_base, rpcserver_thrift,
- rpcserver_booter_thrift,
- GNURadio::ControlPortIf>::i();
+ rpcserver_booter_thrift>::i();
}
/*!
@@ -66,8 +64,7 @@ const std::vector<std::string>
rpcserver_booter_thrift::endpoints()
{
return thrift_server_template<rpcserver_base, rpcserver_thrift,
- rpcserver_booter_thrift,
- GNURadio::ControlPortIf>::endpoints();
+ rpcserver_booter_thrift>::endpoints();
}
// Specialized thrift_application_base attributes and functions
@@ -87,14 +84,13 @@ const unsigned int thrift_application_base<rpcserver_base,
rpcserver_booter_thri
ALRIGHT_DEFAULT_BUFFER_SIZE);
template<class rpcserver_base, class rpcserver_booter_thrift>
-std::auto_ptr<thrift_application_base_impl>
+boost::scoped_ptr<thrift_application_base_impl>
thrift_application_base<rpcserver_base, rpcserver_booter_thrift>::p_impl(
new thrift_application_base_impl());
template<class rpcserver_base, class rpcserver_booter_thrift>
thrift_application_base<rpcserver_base,
rpcserver_booter_thrift>::~thrift_application_base()
{
- GR_LOG_DEBUG(d_debug_logger, "thrift_application_base: shutdown");
if(d_thirft_is_running) {
d_thriftserver->stop();
d_thirft_is_running = false;
@@ -125,7 +121,7 @@ bool thrift_application_base<rpcserver_base,
rpcserver_booter_thrift>::applicati
const std::string boost_hostname(boost::asio::ip::host_name());
std::string endpoint = boost::str(boost::format("-h %1% -p %2%") %
boost_hostname % used_port);
- //std::cout << "Thrift endpoint: " << endpoint << " boost hostname: " <<
boost_hostname << std::endl;
+
set_endpoint(endpoint);
GR_LOG_INFO(d_logger, "Apache Thrift: " + endpoint);
diff --git a/gnuradio-runtime/lib/controlport/thrift/rpcserver_thrift.cc
b/gnuradio-runtime/lib/controlport/thrift/rpcserver_thrift.cc
index c4655d3..3e6eabc 100644
--- a/gnuradio-runtime/lib/controlport/thrift/rpcserver_thrift.cc
+++ b/gnuradio-runtime/lib/controlport/thrift/rpcserver_thrift.cc
@@ -49,6 +49,7 @@ void
rpcserver_thrift::registerConfigureCallback(const std::string &id,
const configureCallback_t callback)
{
+ boost::mutex::scoped_lock lock(d_callback_map_lock);
{
ConfigureCallbackMap_t::const_iterator iter(d_setcallbackmap.find(id));
if(iter != d_setcallbackmap.end()) {
@@ -68,6 +69,7 @@ rpcserver_thrift::registerConfigureCallback(const std::string
&id,
void
rpcserver_thrift::unregisterConfigureCallback(const std::string &id)
{
+ boost::mutex::scoped_lock lock(d_callback_map_lock);
ConfigureCallbackMap_t::iterator iter(d_setcallbackmap.find(id));
if(iter == d_setcallbackmap.end()) {
std::stringstream s;
@@ -86,6 +88,7 @@ void
rpcserver_thrift::registerQueryCallback(const std::string &id,
const queryCallback_t callback)
{
+ boost::mutex::scoped_lock lock(d_callback_map_lock);
{
QueryCallbackMap_t::const_iterator iter(d_getcallbackmap.find(id));
if(iter != d_getcallbackmap.end()) {
@@ -105,6 +108,7 @@ rpcserver_thrift::registerQueryCallback(const std::string
&id,
void
rpcserver_thrift::unregisterQueryCallback(const std::string &id)
{
+ boost::mutex::scoped_lock lock(d_callback_map_lock);
QueryCallbackMap_t::iterator iter(d_getcallbackmap.find(id));
if(iter == d_getcallbackmap.end()) {
std::stringstream s;
@@ -123,6 +127,7 @@ rpcserver_thrift::unregisterQueryCallback(const std::string
&id)
void
rpcserver_thrift::setKnobs(const GNURadio::KnobMap& knobs)
{
+ boost::mutex::scoped_lock lock(d_callback_map_lock);
std::for_each(knobs.begin(), knobs.end(),
set_f<GNURadio::KnobMap::value_type,ConfigureCallbackMap_t>
(d_setcallbackmap, cur_priv));
@@ -133,6 +138,7 @@ void
rpcserver_thrift::getKnobs(GNURadio::KnobMap& _return,
const GNURadio::KnobIDList& knobs)
{
+ boost::mutex::scoped_lock lock(d_callback_map_lock);
if(knobs.size() == 0) {
std::for_each(d_getcallbackmap.begin(), d_getcallbackmap.end(),
get_all_f<QueryCallbackMap_t::value_type,
QueryCallbackMap_t, GNURadio::KnobMap>
@@ -148,6 +154,7 @@ rpcserver_thrift::getKnobs(GNURadio::KnobMap& _return,
void
rpcserver_thrift::getRe(GNURadio::KnobMap& _return, const
GNURadio::KnobIDList& knobs)
{
+ boost::mutex::scoped_lock lock(d_callback_map_lock);
if(knobs.size() == 0) {
std::for_each(d_getcallbackmap.begin(), d_getcallbackmap.end(),
get_all_f<QueryCallbackMap_t::value_type,
QueryCallbackMap_t, GNURadio::KnobMap>
@@ -172,6 +179,7 @@ void
rpcserver_thrift::properties(GNURadio::KnobPropMap& _return,
const GNURadio::KnobIDList& knobs)
{
+ boost::mutex::scoped_lock lock(d_callback_map_lock);
if(knobs.size() == 0) {
std::for_each(d_getcallbackmap.begin(), d_getcallbackmap.end(),
properties_all_f<QueryCallbackMap_t::value_type,