# # # add_file "src/monotone/MonotoneThread.cpp" # content [05706d777769f38f7728955b17d4e227c6e1b711] # # add_file "src/monotone/MonotoneThread.h" # content [e11f5a4d15e376cb9a39fa268e1facb8e39bef67] # ============================================================ --- src/monotone/MonotoneThread.cpp 05706d777769f38f7728955b17d4e227c6e1b711 +++ src/monotone/MonotoneThread.cpp 05706d777769f38f7728955b17d4e227c6e1b711 @@ -0,0 +1,375 @@ +/*************************************************************************** + * Copyright (C) 2007 by Thomas Keller * + * address@hidden * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * + ***************************************************************************/ + +#include "MonotoneThread.h" +#include "StdioParser.h" +#include "BasicIOParser.h" + +#include +#include + +MonotoneTask::MonotoneTask() +{ + init(ByteArrayList(), ByteArrayList()); +} + +MonotoneTask::MonotoneTask(const MonotoneTask & other) +{ + returnCode = other.returnCode; + commandNumber = other.commandNumber; + arguments = other.arguments; + options = other.options; + output = other.output; +} + +MonotoneTask::MonotoneTask(const QStringList & args) +{ + init(stringToByteArrayList(args), ByteArrayList()); +} + +MonotoneTask::MonotoneTask(const QStringList & args, const QStringList & opts) +{ + init(stringToByteArrayList(args), stringToByteArrayList(opts)); +} + +MonotoneTask::MonotoneTask(const ByteArrayList & args) +{ + init(args, ByteArrayList()); +} + +MonotoneTask::MonotoneTask(const ByteArrayList & args, const ByteArrayList & opts) +{ + init(args, opts); +} + +void MonotoneTask::init(const ByteArrayList & args, const ByteArrayList & opts) +{ + arguments = args; + options = opts; + returnCode = -1; + + static bool initialized = false; + if (!initialized) + { + qRegisterMetaType("MonotoneTask"); + initialized = true; + } +} + +ByteArrayList MonotoneTask::stringToByteArrayList(const QStringList & list) +{ + ByteArrayList byteArrayList; + foreach (QString entry, list) + { + byteArrayList.append(entry.toUtf8()); + } + return byteArrayList; +} + +QByteArray MonotoneTask::getEncodedInput() const +{ + QByteArray commandLine; + QTextStream streamCmdLine(&commandLine); + + if (options.size() > 0) + { + // currently mtn can only understand key => value option pairs + I(options.size() % 2 == 0); + + streamCmdLine << "o"; + for (int i=0, c=options.size(); i 0); + + streamCmdLine << "l"; + for (int i=0, c=arguments.size(); isetWorkingDirectory(workspacePath); + } + + L(QString("starting %1 %2").arg(mtnBinary).arg(args.join(" "))); + process->start(mtnBinary, args); + + if (!process->waitForStarted()) + { + emit aborted(process->error(), + QString::fromUtf8(process->readAllStandardError()) + ); + cleanup(process); + return; + } + + QTextStream streamProcess(process); + + QByteArray buffer; + QByteArray output; + + bool processingTask = false; + + while (!doAbort) + { + if (process->state() != QProcess::Running) + { + emit aborted(process->error(), + QString::fromUtf8(process->readAllStandardError()) + ); + cleanup(process); + return; + } + + if (queue.size() == 0) continue; + + if (!processingTask) + { + MonotoneTask task = queue.head(); + streamProcess << task.getEncodedInput(); + streamProcess.flush(); + processingTask = true; + } + + if (!process->waitForReadyRead(-1)) + { + emit aborted(process->error(), + QString::fromUtf8(process->readAllStandardError()) + ); + cleanup(process); + return; + } + + // FIXME: what about stderr output here? + buffer.append(process->readAllStandardOutput()); + StdioParser parser(buffer); + + // if the chunk is not yet complete, try again later + if (!parser.parse()) + { + continue; + } + + buffer = parser.getLeftBytes(); + output.append(parser.getPayload()); + int returnCode = parser.getErrorCode(); + + // TODO: support for other chunk types here? + if (parser.getChunkType() == 'm') + { + continue; + } + + MonotoneTask task = queue.dequeue(); + task.setOutput(output); + task.setReturnCode(returnCode); + processingTask = false; + output.clear(); + + emit taskFinished(task); + } + cleanup(process); +} + +void MonotoneThread::cleanup(QProcess * proc) +{ + QMutexLocker locker(&lock); + + doAbort = true; + + if (queue.size() > 0) + { + foreach (MonotoneTask task, queue) + { + emit taskAborted(task); + } + queue.clear(); + } + + // close the pipes + proc->close(); + // send SIGTERM + proc->terminate(); + // block until the process has really been finished + proc->waitForFinished(); +} + +void MonotoneThread::abort() +{ + QMutexLocker locker(&lock); + doAbort = true; +} + +// FIXME: I think we need to care somehow if we pass +// threads around - i.e. use shared ptrs or something +MonotoneThread * MonotoneThreadManager::getThread(const QString & workspace) +{ + QString normalizedWorkspace = normalizeWorkspacePath(workspace); + QString databaseFilePath = getDatabaseFilePath(normalizedWorkspace); + return getThread(databaseFilePath, normalizedWorkspace); +} + +MonotoneThread * MonotoneThreadManager::getThread( + const QString & database, const QString & workspace = QString()) +{ + if (!threadMap.contains(database)) + { + MonotoneThread * thread = new MonotoneThread(mtnPath, database, workspace); + threadMap.insert(database, thread); + } + + // FIXME: we may want to add support for multiple threads for one + // and the same database here in the future... + return threadMap.value(database); +} + +QString MonotoneThreadManager::normalizeWorkspacePath(const QString & workspace) +{ + QDir tempDir(workspace); + if (!tempDir.exists()) + { + throw GuitoneException(tr("workspace directory does not exist")); + } + + bool found = false; + QString normalizedWorkspace; + do + { + if (tempDir.cd("_MTN")) + { + tempDir.cdUp(); + normalizedWorkspace = tempDir.absolutePath(); + found = true; + break; + } + } + while (!tempDir.isRoot() && tempDir.cdUp()); + + if (!found) + { + throw GuitoneException(tr("could not find _MTN directory")); + } + + return normalizedWorkspace; +} + +// we assume that the workspace was already normalized here +QString MonotoneThreadManager::getDatabaseFilePath(const QString & workspace) +{ + // now check again if we know it + if (workspaceMap.contains(workspace)) + { + return workspaceMap.value(workspace); + } + + QFile optionsFile(workspace + "/_MTN/options"); + if (!optionsFile.open(QIODevice::ReadOnly | QIODevice::Text)) + { + throw GuitoneException(tr("could not open _MTN/options for reading")); + } + + QByteArray contents = optionsFile.readAll(); + optionsFile.close(); + + if (contents.size() == 0) + { + throw GuitoneException(tr("file _MTN/options is empty")); + } + + BasicIOParser parser(QString::fromUtf8(contents)); + if (!parser.parse()) + { + throw GuitoneException(tr("could not parse basic_io from _MTN/options")); + } + StanzaList stanzas = parser.getStanzas(); + I(stanzas.size() == 1); + Stanza st = stanzas.at(0); + + QString databaseFilePath; + foreach (StanzaEntry entry, st) + { + if (entry.sym == "database") + { + I(entry.vals.size() == 1); + databaseFilePath = entry.vals.at(0); + break; + } + } + + if (databaseFilePath.isEmpty()) + { + throw GuitoneException(tr("could not find database for workspace")); + } + + // remember what we've just found for later requests + workspaceMap.insert(workspace, databaseFilePath); + return databaseFilePath; +} + ============================================================ --- src/monotone/MonotoneThread.h e11f5a4d15e376cb9a39fa268e1facb8e39bef67 +++ src/monotone/MonotoneThread.h e11f5a4d15e376cb9a39fa268e1facb8e39bef67 @@ -0,0 +1,122 @@ +/*************************************************************************** + * Copyright (C) 2007 by Thomas Keller * + * address@hidden * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * + ***************************************************************************/ + +#ifndef MONOTONE_THREAD_H +#define MONOTONE_THREAD_H + +#include +#include +#include +#include +#include + +#include "vocab.h" + +class MonotoneTask +{ +public: + MonotoneTask(); + MonotoneTask(const MonotoneTask &); + MonotoneTask(const QStringList &); + MonotoneTask(const QStringList &, const QStringList &); + MonotoneTask(const ByteArrayList &); + MonotoneTask(const ByteArrayList &, const ByteArrayList &); + + void setCommandNumber(int num) { commandNumber = num; } + void setOutput(const QByteArray & out) { output = out; } + void setReturnCode(int code) { returnCode = code; } + + QByteArray getEncodedInput() const; + ByteArrayList getArguments() const { return arguments; } + ByteArrayList getOptions() const { return options; } + QByteArray getOutput() const { return output; } + QString getOutputUtf8() const { return QString::fromUtf8(output); } + bool getReturnCode() const { return returnCode; } + int getCommandNumber() const { return commandNumber; } + +private: + void init(const ByteArrayList &, const ByteArrayList &); + ByteArrayList stringToByteArrayList(const QStringList &); + + int returnCode; + int commandNumber; + + ByteArrayList arguments; + ByteArrayList options; + QByteArray output; +}; + +class MonotoneThread : public QThread +{ + Q_OBJECT + +public: + MonotoneThread(const QString &, const QString &, const QString & workspace = QString()); + ~MonotoneThread(); + QString getDatabaseFilePath() const { return databasePath; } + QString getWorkspacePath() const { return workspacePath; } + int getQueueCount() const { return queue.size(); } + +protected: + void run(); + +public slots: + int enqueueTask(const MonotoneTask &); + void abort(); + +signals: + void taskFinished(const MonotoneTask &); + void taskAborted(const MonotoneTask &); + void aborted(QProcess::ProcessError, const QString &); + +private: + void cleanup(QProcess *); + + static const int StdioBufferSize; + + bool doAbort; + int commandNumber; + QString mtnBinary; + QString databasePath; + QString workspacePath; + QQueue queue; + QMutex lock; +}; + +class MonotoneThreadManager : public QObject +{ + Q_OBJECT + +public: + MonotoneThreadManager(const QString & p) : mtnPath(p) {}; + ~MonotoneThreadManager() {}; + + MonotoneThread * getThread(const QString &); + MonotoneThread * getThread(const QString &, const QString &); + +private: + QMap threadMap; + QMap workspaceMap; + QString mtnPath; + + QString getDatabaseFilePath(const QString &); + QString normalizeWorkspacePath(const QString &); +}; +#endif