2016-03-30 5 views
0

Also habe ich an diesem Haustier-Projekt von mir ein paar Wochen in meiner Freizeit gearbeitet und bin über ein Problem gestolpert, auf dem ich feststecke. Ich habe SO durchgesehen und das Problem gegoogelt. Ich habe nichts gefunden, was wirklich mit meinem spezifischen Problem zu tun hat.Cursor-Status, C++ Nebenläufigkeit und SQL-Server

Ich bin ein Multi-Threaded (ich bin sehr neu in Multithreading in C++), die einen ODBC-Treiber verwendet, um eine Verbindung zu einer lokalen SQL Server-Instanz herzustellen. Die Verbindung funktioniert gut und mit allem auf der Haupt-Thread ist in Ordnung. Aber wenn ich anfange, mehrere Threads zu verwenden (wie oben erwähnt, sieht es wie Scheiße aus - ich lerne durch Versuch und Irrtum), bekomme ich Fehlermeldungen, die ich mit der C++ - Concurrencty und der gespeicherten Prozedur zu tun habe läuft auf dem SQL-Server.

Die Fehlermeldung wie show_error angezeigt:

enter image description here Dies ist die gespeicherte Prozedur:

USE [master] 
GO 
/****** Object: StoredProcedure [dbo].[sp_addHistorical] Script Date: 30/03/2016 10:16:04 ******/ 
SET ANSI_NULLS ON 
GO 
SET QUOTED_IDENTIFIER ON 
GO 
ALTER PROCEDURE [dbo].[sp_addHistorical] 
    @Symbol nchar(10),@Date datetime, 
    @Open decimal(12,2),@Close decimal(12,2),@MinPrice decimal(12,2), 
    @MaxPrice decimal(12,2),@Volume int 
AS 
    SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED 
    BEGIN TRANSACTION 
    MERGE HistoricalStock WITH (UPDLOCK) AS myTarget 
     USING (SELECT @Symbol AS Symbol, 
     @Date AS Date, @Open AS [Open], @Close AS [Close], 
     @MinPrice AS MinPrice, @MaxPrice AS MaxPrice,@Volume AS Volume) AS mySource 
     ON mySource.Symbol = myTarget.Symbol AND mySource.Date = myTarget.Date 
     WHEN MATCHED 
      THEN UPDATE 
       SET [Open] = mySource.[Open], [Close] = mySource.[Close], 
       MinPrice = mySource.MinPrice, MaxPrice = mySource.MaxPrice, Volume = mySource.Volume    
     WHEN NOT MATCHED 
      THEN 
       INSERT(Symbol,Date,[Open],[Close],MinPrice,MaxPrice,Volume) 
       VALUES(@Symbol,@Date,@Open,@Close,@MinPrice,@MaxPrice,@Volume); 
    COMMIT 

Unten ist die Datenbank-Connector und das beschissene Threading:

#include "stdafx.h" 
#include "database_con.h" 

//////////////////////////////////////////////////////////////////////// 
// Show errors from the SQLHANDLE 

void database_con::show_error(unsigned int handletype, const SQLHANDLE& handle) 
{ 
    SQLWCHAR sqlstate[1024]; 
    SQLWCHAR message[1024]; 
    if (SQL_SUCCESS == SQLGetDiagRec(handletype, handle, 1, sqlstate, NULL, message, 1024, NULL)) 
     wcout << "Message: " << message << "\nSQLSTATE: " << sqlstate << endl; 
} 

std::wstring database_con::StringToWString(const std::string& s) 
{ 
    std::wstring temp(s.length(), L' '); 
    std::copy(s.begin(), s.end(), temp.begin()); 
    return temp; 
} 

//////////////////////////////////////////////////////////////////////// 
// Builds the stored procedure query. 

std::wstring database_con::buildQuery(vector<std::wstring> input, string symbol) 
{ 
    std::wstringstream builder; 
    builder << L"EXEC sp_addHistorical " << "@Symbol='" << L"" << StringToWString(symbol) << "'," << 
     "@Date='" << (wstring)L"" << input.at(0) << "'," << 
     "@Open=" << (wstring)L"" << input.at(1) << "," << 
     "@Close=" << (wstring)L"" << input.at(2) << "," << 
     "@MaxPrice=" << (wstring)L"" << input.at(3) << "," << 
     "@MinPrice=" << (wstring)L"" << input.at(4) << "," << 
     "@Volume=" << (wstring)L"" << input.at(5) << ";"; 
    return builder.str(); 
} 

void database_con::executeQuery(wstring query) { 

    if (SQL_SUCCESS != SQLExecDirectW(stmt, const_cast<SQLWCHAR*>(query.c_str()), SQL_NTS)) { 
     std::cout << "Execute error " << std::endl; 
     show_error(SQL_HANDLE_STMT, stmt); 
     std::wcout << L"Unsuccessful Query: " << query << std::endl; 
    } 
    // Close Cursor before next iteration starts: 
    SQLRETURN closeCursRet = SQLFreeStmt(stmt, SQL_CLOSE); 
    if (!SQL_SUCCEEDED(closeCursRet)) 
    { 
     show_error(SQL_HANDLE_STMT, stmt); 
     // maybe add some handling for the case that closing failed. 
    } 
} 
//////////////////////////////////////////////////////////////////////// 
// Constructs a database connector object with the historical data and its symbol 

database_con::database_con(std::vector<std::vector<std::wstring>> historical, string symbol){ 
    /* 
    Set up the handlers 
    */ 

    /* Allocate an environment handle */ 
    SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &env); 
    /* We want ODBC 3 support */ 
    SQLSetEnvAttr(env, SQL_ATTR_ODBC_VERSION, (void *)SQL_OV_ODBC3, 0); 
    /* Allocate a connection handle */ 
    SQLAllocHandle(SQL_HANDLE_DBC, env, &dbc); 

    /* Connect to the DSN */ 
    SQLDriverConnectW(dbc, NULL, L"DRIVER={SQL Server};SERVER=ERA-PC-STUART\\JBK_DB;DATABASE=master;UID=geo;PWD=kalle123;", SQL_NTS, NULL, 0, NULL, SQL_DRIVER_COMPLETE); 
    /* Check for success */ 
    if (SQL_SUCCESS != SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt)) 
    { 
     show_error(SQL_HANDLE_DBC, dbc); 
     std::cout << "Failed to connect"; 
    } 
    std::cout << "Building and executing the query" << std::endl; 

    for (_mVecHistIter = historical.begin(); 
     _mVecHistIter != historical.end(); 
     _mVecHistIter+5) { 
     std::thread t(&database_con::executeQuery, *this, buildQuery(*_mVecHistIter, symbol)); 
     std::thread t2(&database_con::executeQuery, *this, buildQuery(*_mVecHistIter, symbol)); 
     std::thread t3(&database_con::executeQuery, *this, buildQuery(*_mVecHistIter, symbol)); 
     std::thread t4(&database_con::executeQuery, *this, buildQuery(*_mVecHistIter, symbol)); 
     std::thread t5(&database_con::executeQuery, *this, buildQuery(*_mVecHistIter, symbol)); 
     t.join(); 
     t2.join(); 
     t3.join(); 
     t4.join(); 
     t5.join(); 

     //executeQuery(buildQuery(*_mVecHistIter, symbol)); 

    } 

    /*_mSymbol = symbol; 
    std::wstringstream stream(StringToWString(historical)); 
    std::wstring line; 
    int row = 0; 
    while (std::getline(stream, line)) { 
     if (row > 0) { 
      vector<wstring> vHistorical = parseData(L"" + line, ','); 
      std::wstring SQL = buildQuery(vHistorical, _mSymbol); 
      if (SQL_SUCCESS != SQLExecDirectW(stmt, const_cast<SQLWCHAR*>(SQL.c_str()), SQL_NTS)) { 
       std::cout << "Execute error " << std::endl; 
       show_error(SQL_HANDLE_STMT, stmt); 
       std::wcout << L"Unsuccessful Query: " << SQL << std::endl; 
      } 
      // Close Cursor before next iteration starts: 
      SQLRETURN closeCursRet = SQLFreeStmt(stmt, SQL_CLOSE); 
      if (!SQL_SUCCEEDED(closeCursRet)) 
      { 
       show_error(SQL_HANDLE_STMT, stmt); 
       // maybe add some handling for the case that closing failed. 
      } 
     } 
     row++; 
    }*/ 
    std::cout << "Query " << _mSymbol << " ready" << std::endl; 

} 

database_con::~database_con() { 
    std::cout << "The database object has been deleted" << std::endl; 
} 

Antwort

0

Ich denke, ein Grund für die Fehler könnte sein, dass alle Ihre Threads dasselbe Anweisung-handle verwenden. Der erste Thread öffnet einen Cursor auf diesem Anweisungshandle, indem er eine Abfrage ausführt. Während die Abfrage ausgeführt wird, öffnet ein anderer Thread eine Abfrage für dasselbe Anweisungshandle, wodurch die Ergebnisse des ersten Threads wahrscheinlich ungültig werden.

Ich würde versuchen, zu jedem Thread seine eigene Aussage Griff verwenden zu machen, indem Sie Ihre executeQuery etwas wie das Ändern:

void database_con::executeQuery(wstring query) 
{ 
    // Allocate a new statement to be used: 
    SQLHSTMT localStmt = SQL_NULL_HSTMT; 
    SQLRETURN ret = SQLAllocHandle(SQL_HANDLE_STMT, dbc, &localStmt); 
    if (!SQL_SUCCEEDED(ret)) 
    { 
     // show_error(SQL_HANDLE_STMT, localStmt); 
     // would need some show_error that can work with the dbc handle, 
     // as the stmt-handle is invalid and therefore no error info can 
     // be fetched 
     std::cout << "Allocating stmt handle failed" << std::endl; 
     return; 
    } 
    // Do the work on that statement now 
    ret = SQLExecDirectW(localStmt, const_cast<SQLWCHAR*>(query.c_str()), SQL_NTS); 
    if(!SQL_SUCCEEDED(ret)) 
    { 
     std::cout << "Execute error " << std::endl; 
     show_error(SQL_HANDLE_STMT, localStmt); 
     std::wcout << L"Unsuccessful Query: " << query << std::endl; 
    } 
    // And finally free the statement created for this thread 
    ret = SQLFreeHandle(SQL_HANDLE_STMT, localStmt); 
    if (!SQL_SUCCESS(ret)) 
    { 
     std::cout << L"Freeing thread handle failed" << std::endl; 
     show_error(SQL_HANDLE_STMT, localStmt); 
    } 
}