Потоки

SotM

Участник
Проверенный
Исходник

Глючит раскраска что-то на форуме.
При запуске стартуют несколько потоков, но вот они почему-то стартуют одновременно и выводится на экран нулевой элемент. Т.е. этот элемент обрабатывается несколько раз, хотя должен обрабатываться только один раз.
Однако все последующие элементы обрабатываются по одному разу. В функцию для потока я добавил случайную задержку, типа эмуляция какого-нибудь важного процесса.
Если увеличить количество потоков (NumThreads), то нулевой элемент массива тоже будет обрабатываться бОльшее количество раз.
В чём прикол?
 
Последнее редактирование:

ProFrager

Знаток
Проверенный
кинь exe'шник, а то на работе нечем компилить.И я бы использовал Эвент вместо мутекса, т.к. не работал с последними.
 

Krinkels

Он где то тут
Администратор
Такс, у меня вроде заработало, вот код

Немного исправлена функция
[SOURCE="c"]// Функция для потока
DWORD WINAPI ForThread( LPVOID lpParam )
{
TCHAR *FileName;

UNREFERENCED_PARAMETER(lpParam);

mutex_lock( &hMutex );
while ( iCurrEntry < MaxElements )
{
FileName = Files[ iCurrEntry ];
iCurrEntry++;
mutex_unlock( &hMutex );

printf( "%s / %d (waited: %d ms)\n", FileName, MaxElements, RandomWait() );

mutex_lock( &hMutex );
}
mutex_unlock( &hMutex );

return 0;
}[/SOURCE]
 

ProFrager

Знаток
Проверенный
Krinkels, у меня то же предположение было, но похоже только вечером смогу что-либо посмотреть
 

SotM

Участник
Проверенный
Самое главное не понятно: почему в первом случае не работает, а во втором работает. Хотя код можно сказать один и тот же.
 

ProFrager

Знаток
Проверенный
SotM, пока чуть времени нашлось решил посмотреть, но облом) ехе'шник требует дллку gcc'шную)
В коде
Код:
        FileName = Files[ iCurrEntry ];
        mutex_unlock( &hMutex );
 
        printf( "%s / %d (waited: %d ms)\n", FileName, MaxElements, RandomWait() );
 
        mutex_lock( &hMutex );
        iCurrEntry++;
не знаю каким именно образом пашут эти мутексы, но если по аналогии с эвентами и симафорами, то могу сказать, что когда ты в цикле получаешь имя файла FileName = Files[ iCurrEntry ]; а потом освобождаешь мутекс другие потоки сразу же бросаются делать то же самое, т.е. берут имя файла из того же элемента массива iCurrEntry, поэтому
Код:
FileName = Files[ iCurrEntry ];
iCurrEntry++;
должны быть вместе, и находиться до mutex_unlock( &hMutex );
Более точно смогу посмотреть только при наличии работающего ехе'шника
 
Последнее редактирование:

SotM

Участник
Проверенный
пока чуть времени нашлось решил посмотреть, но облом) ехе'шник требует дллку gcc'шную)
Хе, вот если бы ты еще сказал какую именно :)
Ладно, перекомпилировал файл (с другими опциями линковщика) и заново выложил сюды

Не совсем понял твои рассуждения.
Пускай функция выглядит так:
[SOURCE="cpp"]DWORD WINAPI ForThread( LPVOID lpParam )
{
char FileName[ MAX_PATH ] = {0};

UNREFERENCED_PARAMETER(lpParam);

mutex_lock( &hMutex );
while ( iCurrEntry < MaxElements )
{
strcpy( FileName, Files[ iCurrEntry ] );
mutex_unlock( &hMutex );

printf( "%s / %d (waited: %d ms)\n", FileName, MaxElements, RandomWait() );

mutex_lock( &hMutex );
iCurrEntry++;
}
mutex_unlock( &hMutex );

return 0;
}[/SOURCE]

Т.е. мы из shared массива копируем имя файла в локальную переменную и дальше с ней делаем всё что хотим.
Вроде теоретически это так.
 

ProFrager

Знаток
Проверенный
SotM,
Т.е. мы из shared массива копируем имя файла в локальную переменную и дальше с ней делаем всё что хотим.
Вроде теоретически это так.
В общем почитал про мутексы в мсдн, теперь понятно для чего они. Так вот. Да, мы копируем имя файла, но потом идет макрос mutex_unlock, который дает возможность другому потоку выйти из ожидания и пока предыдущий поток печатает инфу через printf, следующий поток начинает копировать данные из ТОЙ ЖЕ ячейки массива! Странно, что у тебя только первый элемент 4 раза проходит, вообще все должны по 4 раза пройти, видимо из-за того, что printf первый раз чего то инициализирует у себя, поэтому так долго отрабатывает. По уму должно быть именно так, как написал Krinkels - добавление индекса должно быть до освобождения мутекса.

Добавлено через 10 минут
З.Ы. файрвол яростно негодовал над действиями данного приложения :D
 

SotM

Участник
Проверенный
Хммм, может я чего-то не понимаю. Но я считал, что другой поток запускает копию функции (ForThread), т.е. эта функция стартует уже с другим буфером FileName (с другим адресом). Или же получается, что все потоки будут использовать всё те же локальные (т.е. те которые объявлены внутри ForThread) переменные?

Upd. Ради интереса сделал такую фишку:
[SOURCE="cpp"] UNREFERENCED_PARAMETER(lpParam);

printf( "ForThread(): 0x%X\n", (int)&FileName[0] );

mutex_lock( &hMutex );
while ( iCurrEntry < MaxElements )
[/SOURCE]
На экране появилось четыре "ForThread() ...", и все четыре с разными адресами.

И также вопрос немного в другую сторону, является ли мьютекс в данном примере оптимальным решением? Или вместо него можно использовать что-то другое. (CriticalSection, for example).
 
Последнее редактирование:

ProFrager

Знаток
Проверенный
SotM, ты не понял сути проблемы. Для каждого потока создается свой стек, так что ты не в ту сторону роешь. Проблема не с переменной FileName, ее содержимым, или чего либо другим, связанным с массивом, проблема со своевременным обновлением глобальной переменной iCurrEntry. Для другого потока при выходе из ожидания она будет содержать то же значение, что и у предыдущего, потому что ты не успел добавить к ней единицу.

Добавлено через 5 минут
А на счет CriticalSection, так в данном случае это то же самое получается, что и mutex.
 

SotM

Участник
Проверенный
Кажется понятненько.

Тогда для тех, кому интересно, еще раз приведу ту часть кода, которая поменялась и которая работает правильно.
[SOURCE="cpp"]// Функция для потока
DWORD WINAPI ForThread( LPVOID lpParam )
{
TCHAR *FileName;

UNREFERENCED_PARAMETER( lpParam );

mutex_lock( &hMutex );
while ( iCurrEntry < MaxElements )
{
FileName = Files[ iCurrEntry ];
iCurrEntry++;
mutex_unlock( &hMutex );

printf( "%s / %d (waited: %d ms)\n", FileName, MaxElements, RandomWait() );

mutex_lock( &hMutex );
}
mutex_unlock( &hMutex );

return 0;
}[/SOURCE]

Другие вопросы:
1. Количество создаваемых потоков должно быть равным количеству ядер в процессоре? (для достижения максимального использования проца).
2. Является ли мьютекс достаточно быстрым? Ты сказал, что другое можно и не использовать. Но может по скорости есть что-то еще.

 
Последнее редактирование:

ProFrager

Знаток
Проверенный
можно слегка упростить:
[SOURCE="cpp"]// Функция для потока
DWORD WINAPI ForThread( LPVOID lpParam )
{
TCHAR *FileName;

UNREFERENCED_PARAMETER( lpParam );

while ( iCurrEntry < MaxElements )
{
mutex_lock( &hMutex );
FileName = Files[ iCurrEntry ];
iCurrEntry++;
mutex_unlock( &hMutex );

printf( "%s / %d (waited: %d ms)\n", FileName, MaxElements, RandomWait() );
}
return 0;
}[/SOURCE]
и пару замечаний: а для чего надо UNREFERENCED_PARAMETER(..)? Не юзается параметр и ничего плохого в этом нет;
мб си сама определяет где функция потока и при return вызывает нужную функцию, но вообще я обычно использую ExitThread(x), иначе у тред меня некорректно завершался. Мож я тогда чего накосячил, но с тех пор для тредов использую всегда ExitThread.

1. Количество создаваемых потоков должно быть равным количеству ядер в процессоре? (для достижения максимального использования проца).
если все потоки загружают проц ядро на все 100, то должно хватить, но так не часто бывает, поэтому приходится увеличивать число потоков для полной нагрузки. Но с другой стороны если их число задать слишком большим, то возрастут потери времени на переключение между потоками. Так что зависит от ситуации, я бы задавал в пределах [NumCores ; NumCores*1.5].

2. Является ли мьютекс достаточно быстрым? Ты сказал, что другое можно и не использовать. Но может по скорости есть что-то еще.
а других вариатов нет) все потоковые операции реализуется нулевым кольцом винды. Все mutex'ы, event'ы, critical section и т.д. работают посредством одних и тех же внутренних функций системы.

И оффтопик в данной теме: ты не имел ли дело с CUDA?
не, в такие дебри еще не залезал, но говорят программирование на нем схоже с программированием на ассемблере.
 
Последнее редактирование:

SotM

Участник
Проверенный
Шоб компилятор warning не выдавал, типа "не используете аргумент".
Я люблю когда компилятор практически никаких warning'ов не выдает :)

Здесь такое не нужно. Если всё в порядке, то функция ForThread возвращает 0, а если ошибка то 1.

можно слегка упростить:
А разве не нужно блокировать мьютекс до того, как мы проверяем общую переменную в этом месте:
[source="cpp"]while ( iCurrEntry < MaxElements )[/source]
 

Булат Зиганшин

Developer
Модератор
1. критические секции быстрее, им не нужно лезть на 0-й уровень привилегий, поскольку они работают только внутри одного процесса

2. число потоков высчитывается исходя из 100%-ной загрузки всех ядер. если проц без HT - то просто можно сложить на сколько процентов загружает cpu каждый поток в программе и отсюда посчитать

3. то, как ты программируешь многопоточность - каменный век. есть более высокоуровневые парадигмы, и их можно реализовать даже если ты привязан к c++
 

Krinkels

Он где то тут
Администратор
С критическими секциями наверное будет так:
[SOURCE="cpp"]#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#include <stdlib.h>
#include <stdio.h>
#include <time.h>

#define NumThreads 5
#define MaxElements 20

#define mutex_lock(mutex) (WaitForSingleObject((*(mutex)), INFINITE))
#define mutex_unlock(mutex) (ReleaseMutex(*(mutex)))
#define mutex_destroy(mutex) (CloseHandle(*(mutex)))

HANDLE ghThreads[ NumThreads ];
int iCurrEntry = 0;

char *Files[ MaxElements ];
typedef HANDLE mutex_t;
mutex_t hMutex;

CRITICAL_SECTION g_cs;

int mutex_init( mutex_t *mutex )
{
*mutex = CreateMutex( NULL, FALSE, NULL );
return *mutex == NULL;
}

// Ждём случайное количество мс
int RandomWait( void )
{
int wait_Ms;
clock_t end_time;

wait_Ms = rand() % 500 + 1;
//wait_Ms = 1000;
//wait_Ms = 500;
end_time = clock() + wait_Ms;
while ( clock() < end_time )
{};
return wait_Ms;
}

// Функция для потока
DWORD WINAPI ForThread( LPVOID lpParam )
{
TCHAR *FileName;

UNREFERENCED_PARAMETER( lpParam );

//mutex_lock( &hMutex );
EnterCriticalSection( &g_cs );
while ( iCurrEntry < MaxElements )
{
FileName = Files[ iCurrEntry ];
iCurrEntry++;
LeaveCriticalSection( &g_cs );
//mutex_unlock( &hMutex );

printf( "%s / %d (waited: %d ms)\n", FileName, MaxElements, RandomWait() );

//mutex_lock( &hMutex );
EnterCriticalSection( &g_cs );
}
LeaveCriticalSection( &g_cs );
//mutex_unlock( &hMutex );

return 0;
}

// Создаем потоки
void CreateThreads(void)
{
int i;
//mutex_init( &hMutex );
InitializeCriticalSection( &g_cs );

for( i = 0; i < NumThreads; i++ )
{
ghThreads = CreateThread(NULL, // default security
0, // default stack size
ForThread, // name of the thread function
NULL, // no thread parameters
0, // default startup flags
NULL);

printf( "thread %d / %d\n", i, NumThreads );

if ( ghThreads == NULL )
{
printf( "CreateThread failed (%d)\n", (int)GetLastError() );
return;
}
}
}

// Закрываем все хэндлы потоков
void CloseEvents( void )
{
int i;

for ( i = 0; i < NumThreads; i++ )
CloseHandle( ghThreads[ i ] );

//mutex_destroy( &hMutex );
}

// Заполняем массив
void PopulateList( char **List )
{
int i;
char buffer[128];

for ( i = 0; i < MaxElements; i++ )
{
sprintf( buffer, "%3d", i );
List = strdup( buffer );
}
}

// Освобождаем память
void FreeList( char **List )
{
int i;
for ( i = 0; i < MaxElements; i++ )
free( List );
}

// Главная функция
int main( void )
{
DWORD dwWaitResult;

/* initialize random seed: */
srand ( time(NULL) );

PopulateList( Files );

CreateThreads();

dwWaitResult = WaitForMultipleObjects(
NumThreads, // number of handles in array
ghThreads, // array of thread handles
TRUE, // wait until all are signaled
INFINITE);

switch (dwWaitResult)
{
// All thread objects were signaled
case WAIT_OBJECT_0:
printf( "All threads ended, cleaning up for application exit...\n" );
break;

// An error occurred
default:
printf( "WaitForMultipleObjects failed (%d)\n", (int)GetLastError() );
return 1;
}

CloseEvents();
FreeList( Files );

return EXIT_SUCCESS;
}[/SOURCE]
 

SotM

Участник
Проверенный
1. критические секции быстрее, им не нужно лезть на 0-й уровень привилегий, поскольку они работают только внутри одного процесса
Хмм, серьезно? Надо мне будет написать какой нить тест, чтобы увидеть это своими глазами. :)

2. число потоков высчитывается исходя из 100%-ной загрузки всех ядер. если проц без HT - то просто можно сложить на сколько процентов загружает cpu каждый поток в программе и отсюда посчитать
Таким образом можно это сделать динамически, т.е. чтобы сама программа могла получить процент загруженности проца и сама создала нужное количество потоков? Я как-то думал об этом.

3. то, как ты программируешь многопоточность - каменный век. есть более высокоуровневые парадигмы, и их можно реализовать даже если ты привязан к c++
Я учился по примерам с инета :) Это очень ясный и простой пример, может кому-то еще он поможет. :)
А что ты предлагаешь? Мне аж интересно, всегда есть место для улучшений. :)
 
Сверху