Was ich tun möchte, ist eine Art "pipe" (wie eine Pipe zwischen Prozessen), aber zwischen C++ iostreams innerhalb des gleichen Programms zu erstellen. Ich habe eine Funktion, die einen Eingabestream als Argument benötigt, aber meine Daten stammen aus einem Ausgabestream. Gibt es also eine Standardmethode, um den Ausgang eines std::ostream
in den Eingang eines std::istream
zu leiten?C++ Ausgabestream mit Eingabestream verbinden
Antwort
Sie können einen erstellen, wobei der Ausgang zu einem Puffer geht und std::overflow()
blockiert, wenn der Puffer voll wird. Am anderen Ende haben Sie einen Eingangspuffer, der auf underflow()
blockiert, wenn der Puffer leer wird. Offensichtlich würden Lesen und Schreiben in zwei verschiedenen Threads sein.
Das schwierige Geschäft ist, wie man die zwei Puffer synchronisiert: Die Ströme benutzen keine Synchronisierungsoperationen beim Zugreifen auf die Puffer. Nur wenn eine der virtuellen Funktionen aufgerufen wird, können Sie die Operation abfangen und mit der Synchronisation umgehen. Auf der anderen Seite ist die Verwendung eines Puffers ziemlich ineffizient. Die Art und Weise, wie ich dieses Problem angehen würde, besteht darin, einen relativ kleinen Ausgabepuffer (z. B. 256 char
s) zu verwenden und auch sync()
zu überschreiben, um diese Funktion für die Übertragung von Zeichen in den Eingabepuffer zu verwenden. Die streambuf
würde einen Mutex für die Synchronisation und eine Bedingungsvariable verwenden, um einen vollen Eingabepuffer bei Ausgabe und einen leeren Eingabepuffer bei Eingabe zu blockieren. Um das saubere Herunterfahren zu unterstützen, sollte es auch eine Funktion geben, die ein Flag setzt, dass keine weiteren Eingaben mehr kommen und alle weiteren Ausgabefunktionen fehlschlagen sollten.
Das Erstellen der tatsächlichen Implementierung zeigt, dass zwei Puffer nicht ausreichen: Die Threads, die auf den Eingangs- und den Ausgangspuffer zugreifen, können aktiv sein, wenn der jeweils andere Puffer blockiert. Somit wird ein dritter Zwischenpuffer benötigt. Mit dieser kleinen Änderung des obigen Plans ist unten ein gewisser Code (er verwendet winzige Puffer, um sicherzustellen, dass es tatsächlich Überläufe und Unterläufe gibt; für eine tatsächliche Verwendung sollte zumindest der Eingangspuffer wahrscheinlich größer sein).
// threadbuf.cpp -*-C++-*-
// ----------------------------------------------------------------------------
// Copyright (C) 2013 Dietmar Kuehl http://www.dietmar-kuehl.de
//
// Permission is hereby granted, free of charge, to any person
// obtaining a copy of this software and associated documentation
// files (the "Software"), to deal in the Software without restriction,
// including without limitation the rights to use, copy, modify,
// merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be
// included in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
// OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
// WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
// OTHER DEALINGS IN THE SOFTWARE.
// ----------------------------------------------------------------------------
#include <algorithm>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <stdexcept>
#include <streambuf>
#include <string>
#include <thread>
// ----------------------------------------------------------------------------
class threadbuf
: public std::streambuf
{
private:
typedef std::streambuf::traits_type traits_type;
typedef std::string::size_type string_size_t;
std::mutex d_mutex;
std::condition_variable d_condition;
std::string d_out;
std::string d_in;
std::string d_tmp;
char* d_current;
bool d_closed;
public:
threadbuf(string_size_t out_size = 16, string_size_t in_size = 64)
: d_out(std::max(string_size_t(1), out_size), ' ')
, d_in(std::max(string_size_t(1), in_size), ' ')
, d_tmp(std::max(string_size_t(1), in_size), ' ')
, d_current(&this->d_tmp[0])
, d_closed(false)
{
this->setp(&this->d_out[0], &this->d_out[0] + this->d_out.size() - 1);
this->setg(&this->d_in[0], &this->d_in[0], &this->d_in[0]);
}
void close()
{
{
std::unique_lock<std::mutex> lock(this->d_mutex);
this->d_closed = true;
while (this->pbase() != this->pptr()) {
this->internal_sync(lock);
}
}
this->d_condition.notify_all();
}
private:
int_type underflow()
{
if (this->gptr() == this->egptr())
{
std::unique_lock<std::mutex> lock(this->d_mutex);
while (&this->d_tmp[0] == this->d_current && !this->d_closed) {
this->d_condition.wait(lock);
}
if (&this->d_tmp[0] != this->d_current) {
std::streamsize size(this->d_current - &this->d_tmp[0]);
traits_type::copy(this->eback(), &this->d_tmp[0],
this->d_current - &this->d_tmp[0]);
this->setg(this->eback(), this->eback(), this->eback() + size);
this->d_current = &this->d_tmp[0];
this->d_condition.notify_one();
}
}
return this->gptr() == this->egptr()
? traits_type::eof()
: traits_type::to_int_type(*this->gptr());
}
int_type overflow(int_type c)
{
std::unique_lock<std::mutex> lock(this->d_mutex);
if (!traits_type::eq_int_type(c, traits_type::eof())) {
*this->pptr() = traits_type::to_char_type(c);
this->pbump(1);
}
return this->internal_sync(lock)
? traits_type::eof()
: traits_type::not_eof(c);
}
int sync()
{
std::unique_lock<std::mutex> lock(this->d_mutex);
return this->internal_sync(lock);
}
int internal_sync(std::unique_lock<std::mutex>& lock)
{
char* end(&this->d_tmp[0] + this->d_tmp.size());
while (this->d_current == end && !this->d_closed) {
this->d_condition.wait(lock);
}
if (this->d_current != end)
{
std::streamsize size(std::min(end - d_current,
this->pptr() - this->pbase()));
traits_type::copy(d_current, this->pbase(), size);
this->d_current += size;
std::streamsize remain((this->pptr() - this->pbase()) - size);
traits_type::move(this->pbase(), this->pptr(), remain);
this->setp(this->pbase(), this->epptr());
this->pbump(remain);
this->d_condition.notify_one();
return 0;
}
return traits_type::eof();
}
};
// ----------------------------------------------------------------------------
static void writer(std::ostream& out)
{
for (std::string line; std::getline(std::cin, line);)
{
out << "writer: '" << line << "'\n";
}
}
// ----------------------------------------------------------------------------
static void reader(std::istream& in)
{
for (std::string line; std::getline(in, line);)
{
std::cout << "reader: '" << line << "'\n";
}
}
// ----------------------------------------------------------------------------
int main()
{
try
{
threadbuf sbuf;
std::ostream out(&sbuf);
std::istream in(&sbuf);
std::thread write(&::writer, std::ref(out));
std::thread read(&::reader, std::ref(in));
write.join();
sbuf.close();
read.join();
}
catch (std::exception const& ex)
{
std::cerr << "ERROR: " << ex.what() << "\n";
}
}
+1 für die Mühe; Das OP war sicherlich auf der Suche nach einer viel schnelleren und einfacheren Lösung. – Walter
Nun, mit dem obigen Code ** ist ** schnell für snybody, aber ich :) Ich habe ähnliche Anfrage zuvor gesehen, d. H. Es kann auch für andere nützlich sein. ... und es war eine interessante Übung, um das umzusetzen, was ich vorher nur skizziert habe. Endlich: Mir ist keine einfachere Lösung bekannt! –
Sie sind eine spiegelnde Legende Dietmar. Ich benutze dies in einem Unit-Test und es funktioniert ein Vergnügen. Vielen Dank. – MattSmith
Ist std :: stringstream Ihrem Bedarf? Wenn nein, erkläre warum. – AProgrammer
Es gibt ein Iostream (beachten Sie, dass es am Anfang ein 'i' und ein' o' hat). Sie pumpen Daten an einem Ende und es kommt dort raus. Ist es das was du willst. –
-1 unterspezifizierte Frage. –