pthreads/{position_race/position_race.c → producer_consumer/producer_consumer.c} RENAMED
@@ -1,103 +1,178 @@
1
  #include <pthread.h>
 
2
  #include <stdio.h>
3
  #include <stdlib.h>
4
  #include <time.h>
5
  #include <unistd.h>
6
 
7
  typedef struct
8
  {
9
- size_t thread_count;
10
- pthread_mutex_t position_mutex;
11
- size_t position;
 
 
 
 
 
 
 
12
  } shared_data_t;
13
 
14
- typedef struct
15
- {
16
- size_t thread_num;
17
- shared_data_t* shared_data;
18
- } private_data_t;
19
-
20
  int create_threads(shared_data_t* shared_data);
21
- void* run(void* data);
 
 
22
 
23
 
24
  int main(int argc, char* argv[])
25
  {
 
26
  shared_data_t* shared_data = (shared_data_t*) calloc(1, sizeof(shared_data_t));
27
  if ( shared_data == NULL )
28
  return (void)fprintf(stderr, "error: could not allocate shared memory\n"), 1;
29
 
30
- shared_data->thread_count = sysconf(_SC_NPROCESSORS_ONLN);
31
- if ( argc >= 2 )
32
- shared_data->thread_count = strtoull(argv[1], NULL, 10);
33
-
34
- pthread_mutex_init(&shared_data->position_mutex, /*attr*/ NULL);
35
- shared_data->position = 0;
 
 
 
36
 
37
  struct timespec start_time;
38
  clock_gettime(CLOCK_MONOTONIC, &start_time);
39
 
40
- int error = create_threads(shared_data);
41
- if ( error )
42
- return error;
43
-
44
  struct timespec finish_time;
45
  clock_gettime(CLOCK_MONOTONIC, &finish_time);
46
 
47
  double elapsed_seconds = finish_time.tv_sec - start_time.tv_sec
48
  + 1e-9 * (finish_time.tv_nsec - start_time.tv_nsec);
49
 
50
- printf("Hello execution time %.9lfs\n", elapsed_seconds);
 
 
 
 
 
 
 
 
 
 
 
51
 
52
- pthread_mutex_destroy(&shared_data->position_mutex);
53
  free(shared_data);
54
- return 0;
55
  }
56
 
57
- int create_threads(shared_data_t* shared_data)
 
 
58
  {
59
- pthread_t* threads = (pthread_t*) malloc(shared_data->thread_count * sizeof(pthread_t));
60
- if ( threads == NULL )
61
- return (void)fprintf(stderr, "error: could not allocate memory for %zu threads\n", shared_data->thread_count), 2;
 
 
62
 
63
- private_data_t* private_data = (private_data_t*) calloc(shared_data->thread_count, sizeof(private_data_t));
64
- if ( private_data == NULL )
65
- return (void)fprintf(stderr, "error: could not allocate private memory for %zu threads\n", shared_data->thread_count), 3;
66
 
67
- for ( size_t index = 0; index < shared_data->thread_count; ++index )
68
- {
69
- private_data[index].thread_num = index;
70
- private_data[index].shared_data = shared_data;
71
- pthread_create(&threads[index], NULL, run, &private_data[index]);
 
 
 
 
 
 
 
 
 
 
 
 
 
72
  }
73
 
74
- pthread_mutex_lock(&shared_data->position_mutex);
75
- printf("Hello world from main thread\n");
76
- pthread_mutex_unlock(&shared_data->position_mutex);
 
 
 
 
77
 
78
- for ( size_t index = 0; index < shared_data->thread_count; ++index )
79
- pthread_join(threads[index], NULL);
80
 
81
- free(private_data);
82
- free(threads);
83
  return 0;
84
  }
85
 
86
- void* run(void* data)
87
  {
88
- private_data_t* private_data = (private_data_t*)data;
89
- shared_data_t* shared_data = private_data->shared_data;
90
 
91
- size_t thread_num = (*private_data).thread_num;
92
- size_t thread_count = shared_data->thread_count;
 
 
 
93
 
94
- pthread_mutex_lock(&shared_data->position_mutex);
95
 
96
- ++shared_data->position;
97
- fprintf(stdout, "Thread %zu/%zu: I arrived at position %zu\n", thread_num
98
- , thread_count, shared_data->position);
99
 
100
- pthread_mutex_unlock(&shared_data->position_mutex);
 
 
 
 
 
 
101
 
102
  return NULL;
103
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  #include <pthread.h>
2
+ #include <semaphore.h>
3
  #include <stdio.h>
4
  #include <stdlib.h>
5
  #include <time.h>
6
  #include <unistd.h>
7
 
8
  typedef struct
9
  {
10
+ size_t buffer_size;
11
+ double* buffer;
12
+ size_t rounds;
13
+ useconds_t min_producer_delay;
14
+ useconds_t max_producer_delay;
15
+ useconds_t min_consumer_delay;
16
+ useconds_t max_consumer_delay;
17
+ sem_t producer_semaphore;
18
+ sem_t consumer_semaphore;
19
+ pthread_mutex_t stdout_mutex;
20
  } shared_data_t;
21
 
22
+ int analyze_arguments(int argc, char* argv[], shared_data_t* shared_data);
 
 
 
 
 
23
  int create_threads(shared_data_t* shared_data);
24
+ void* produce(void* data);
25
+ void* consume(void* data);
26
+ void random_sleep(useconds_t min_milliseconds, useconds_t max_milliseconds);
27
 
28
 
29
  int main(int argc, char* argv[])
30
  {
31
+ srand( time(NULL) );
32
  shared_data_t* shared_data = (shared_data_t*) calloc(1, sizeof(shared_data_t));
33
  if ( shared_data == NULL )
34
  return (void)fprintf(stderr, "error: could not allocate shared memory\n"), 1;
35
 
36
+ int error = analyze_arguments(argc, argv, shared_data);
37
+ if ( error == 0 )
38
+ {
39
+ shared_data->buffer = (double*) calloc(shared_data->buffer_size, sizeof(double));
40
+ if ( shared_data->buffer )
41
+ {
42
+ sem_init(&shared_data->producer_semaphore, 0 /*pshared*/, shared_data->buffer_size);
43
+ sem_init(&shared_data->consumer_semaphore, 0 /*pshared*/, 0);
44
+ pthread_mutex_init(&shared_data->stdout_mutex, /*attr*/ NULL);
45
 
46
  struct timespec start_time;
47
  clock_gettime(CLOCK_MONOTONIC, &start_time);
48
 
49
+ error = create_threads(shared_data);
50
+ if ( error == 0 )
51
+ {
 
52
  struct timespec finish_time;
53
  clock_gettime(CLOCK_MONOTONIC, &finish_time);
54
 
55
  double elapsed_seconds = finish_time.tv_sec - start_time.tv_sec
56
  + 1e-9 * (finish_time.tv_nsec - start_time.tv_nsec);
57
 
58
+ printf("Simulation time %.9lfs\n", elapsed_seconds);
59
+ }
60
+
61
+ pthread_mutex_destroy(&shared_data->stdout_mutex);
62
+ free(shared_data->buffer);
63
+ }
64
+ else
65
+ {
66
+ fprintf(stderr, "error: could not allocate memory for %zu products\n", shared_data->buffer_size);
67
+ error = 2;
68
+ }
69
+ }
70
 
 
71
  free(shared_data);
72
+ return error;
73
  }
74
 
75
+ int analyze_arguments(int argc, char* argv[], shared_data_t* shared_data)
76
+ {
77
+ if ( argc != 7 )
78
  {
79
+ fprintf(stderr, "usage: producer_consumer buffer_size rounds"
80
+ " min_producer_delay max_producer_delay"
81
+ " min_consumer_delay max_consumer_delay\n");
82
+ return 1;
83
+ }
84
 
85
+ shared_data->buffer_size = strtoull(argv[1], NULL, 10);
86
+ if ( shared_data->buffer_size == 0 )
87
+ return 2;
88
 
89
+ if ( sscanf(argv[2], "%zu", &shared_data->rounds) != 1 || shared_data->rounds == 0 )
90
+ return (void)fprintf(stderr, "invalid rounds: %s\n", argv[2]), 2;
91
+
92
+ if ( sscanf(argv[3], "%u", &shared_data->min_producer_delay) != 1 )
93
+ return (void)fprintf(stderr, "invalid min producer delay: %s\n", argv[3]), 3;
94
+
95
+ if ( sscanf(argv[4], "%u", &shared_data->max_producer_delay) != 1
96
+ || shared_data->max_producer_delay < shared_data->min_producer_delay )
97
+ return (void)fprintf(stderr, "invalid max producer delay: %s\n", argv[4]), 4;
98
+
99
+ if ( sscanf(argv[5], "%u", &shared_data->min_consumer_delay) != 1 )
100
+ return (void)fprintf(stderr, "invalid min consumer delay: %s\n", argv[5]), 5;
101
+
102
+ if ( sscanf(argv[6], "%u", &shared_data->max_consumer_delay) != 1
103
+ || shared_data->max_consumer_delay < shared_data->min_consumer_delay )
104
+ return (void)fprintf(stderr, "invalid max consumer delay: %s\n", argv[6]), 6;
105
+
106
+ return EXIT_SUCCESS;
107
  }
108
 
109
+ int create_threads(shared_data_t* shared_data)
110
+ {
111
+ pthread_t producer_thread;
112
+ pthread_t consumer_thread;
113
+
114
+ pthread_create(&producer_thread, NULL, produce, shared_data);
115
+ pthread_create(&consumer_thread, NULL, consume, shared_data);
116
 
117
+ pthread_join(producer_thread, NULL);
118
+ pthread_join(consumer_thread, NULL);
119
 
 
 
120
  return 0;
121
  }
122
 
123
+ void* produce(void* data)
124
  {
125
+ shared_data_t* shared_data = (shared_data_t*)data;
 
126
 
127
+ for ( size_t round = 1; round <= shared_data->rounds; ++round )
128
+ {
129
+ for ( size_t index = 0; index < shared_data->buffer_size; ++index )
130
+ {
131
+ sem_wait(&shared_data->producer_semaphore);
132
 
133
+ random_sleep(shared_data->min_producer_delay, shared_data->max_producer_delay);
134
 
135
+ shared_data->buffer[index] = round + (index + 1) / 100.0;
 
 
136
 
137
+ pthread_mutex_lock(&shared_data->stdout_mutex);
138
+ printf("Produced %.2lf\n", shared_data->buffer[index]);
139
+ pthread_mutex_unlock(&shared_data->stdout_mutex);
140
+
141
+ sem_post(&shared_data->consumer_semaphore);
142
+ }
143
+ }
144
 
145
  return NULL;
146
  }
147
+
148
+ void* consume(void* data)
149
+ {
150
+ shared_data_t* shared_data = (shared_data_t*)data;
151
+
152
+ for ( size_t round = 1; round <= shared_data->rounds; ++round )
153
+ {
154
+ for ( size_t index = 0; index < shared_data->buffer_size; ++index )
155
+ {
156
+ sem_wait(&shared_data->consumer_semaphore);
157
+
158
+ random_sleep(shared_data->min_consumer_delay, shared_data->max_consumer_delay);
159
+
160
+ pthread_mutex_lock(&shared_data->stdout_mutex);
161
+ printf("\t\t\tConsumed %.2lf\n", shared_data->buffer[index]);
162
+ pthread_mutex_unlock(&shared_data->stdout_mutex);
163
+
164
+ sem_post(&shared_data->producer_semaphore);
165
+ }
166
+ }
167
+
168
+ return NULL;
169
+ }
170
+
171
+ void random_sleep(useconds_t min_milliseconds, useconds_t max_milliseconds)
172
+ {
173
+ useconds_t duration = min_milliseconds;
174
+ useconds_t range = max_milliseconds - min_milliseconds;
175
+ if ( range > 0 )
176
+ duration += rand() % range;
177
+ usleep( 1000 * duration );
178
+ }