-
Notifications
You must be signed in to change notification settings - Fork 0
/
mach.c
219 lines (185 loc) · 6.68 KB
/
mach.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
#include <limits.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include "dep/sem.h"
#include "queue.h"
#include "dep/run.h"
#define MAX_LINE_LENGTH 4096
#define TERMINATION_SIGNAL 0
#define ERROR_SIGNAL -1
#define RUNNING_SIGNAL 1
#define COMPLETED_SIGNAL 2
static SEM *blockSemaphore;
static void die(const char *s) {
perror(s);
exit(EXIT_FAILURE);
}
static int parse_positive_int_or_die(char *str) {
errno = 0;
char *endptr;
long x = strtol(str, &endptr, 10);
if (errno != 0) {
die("invalid number");
}
// Non empty string was fully parsed
if (str == endptr || *endptr != '\0') {
fprintf(stderr, "invalid number\n");
exit(EXIT_FAILURE);
}
if (x <= 0) {
fprintf(stderr, "number not positive\n");
exit(EXIT_FAILURE);
}
if (x > INT_MAX) {
fprintf(stderr, "number too large\n");
exit(EXIT_FAILURE);
}
return (int)x;
}
static void *lineThread(void *command) {
//adds command to queue, runs command, passes its output to queue to be printed, then indicates being ready by calling V() on blockSemaphore
pthread_detach(pthread_self());
char *commandString = (char *)command;
//adds cmd to queue
if(queue_put(commandString, NULL, RUNNING_SIGNAL)){
//error in queue_put
perror("queue_put");
exit(EXIT_FAILURE);
}
//free(commandString);
char *out;
int output = run_cmd(commandString, &out);
if(output >= 0) {
queue_put(commandString, out, COMPLETED_SIGNAL);
}else{
//Failure
queue_put(commandString, NULL, ERROR_SIGNAL);
}
V(blockSemaphore);
return NULL;
}
static void *printThread(void *n) {
//will print anytime a new entry is available in queue.
char *cmd, *out;
int flags;
while(1){
if (queue_get(&cmd, &out, &flags)) {
perror("queue_get");
exit(EXIT_FAILURE);
}
if(flags == TERMINATION_SIGNAL) {
pthread_exit(NULL);
}
if(flags == RUNNING_SIGNAL) {
printf("Running '%s' ...\n", cmd);
}else if(flags == COMPLETED_SIGNAL){
printf("Completed '%s': \"%s\".\n", cmd, out);
free(out);
free(cmd);
}else if(flags == ERROR_SIGNAL){
printf("Failure '%s'\n", cmd);
free(out);
free(cmd);
}
}
}
void waitForThreads() {
P(blockSemaphore); //waits until every thread has called its V()-function
}
int main(int argc, char **argv) {
if(queue_init()) {
perror("queue_init");
exit(EXIT_FAILURE);
}
/*
start output thread
*/
pthread_t printThreadID;
pthread_create(&printThreadID, NULL, &printThread, NULL);
//parse argv
if(argc != 3) {
fprintf(stderr, "usage: mach <number of threads> <mach-file>");
exit(EXIT_FAILURE);
}
int maxNumberOfThreads = parse_positive_int_or_die(argv[1]);
char *makeFilePath = argv[2];
//open file
char *readMode = "r";
FILE *machFileStream;
if((machFileStream = fopen(makeFilePath, readMode)) == NULL) {
perror("fopen");
exit(EXIT_FAILURE);
}
char line[MAX_LINE_LENGTH + 1];
int linesUntilEmptyLine = 0;
blockSemaphore = semCreate(-maxNumberOfThreads); //Semaphore that blocks execution of the next block until every thread has called its V()-function
if(!blockSemaphore) {
die("semCreate");
}
pthread_t threadIDs[maxNumberOfThreads]; //array of thread id´s
while(1) {
fgets(line, MAX_LINE_LENGTH + 2, machFileStream);
//error handling for fgets
if(ferror(machFileStream)) {
perror("fgets");
exit(EXIT_FAILURE);
}else if (feof(machFileStream)) {
for (int i = 0; i < (maxNumberOfThreads - linesUntilEmptyLine) + 1; i++) {
V(blockSemaphore);
}
waitForThreads();
break;
}
if(strlen(line) == 1) {
//stop after each empty line/wait for processes have finished (proccesses will call V() function after completion. Need to wait with P() function until Semaphore is 1 again i.e. all proccesses have finished)
//wait for threads to finish and then restart for new block
/*remove "unused" semaphore blockades:*/
for (int i = 0; i < (maxNumberOfThreads - linesUntilEmptyLine) + 1; i++) {
V(blockSemaphore);
}
waitForThreads();
semDestroy(blockSemaphore);
blockSemaphore = semCreate(-maxNumberOfThreads); //reset semaphore
if(!blockSemaphore) {
die("semCreate");
}
linesUntilEmptyLine = 0; //resets lines for new block
continue; //starts new block execution
}
//if not in empty line, remove trailing newline
line[strlen(line)-1] = '\0';
if(linesUntilEmptyLine >= maxNumberOfThreads) {
V(blockSemaphore); //calls V() one time that Semaphore is > 0 after every thread returned
waitForThreads();
//if we wait for threads because the maxium quantity of threads was reached, we still need to handle the current line -> solution: after wait is completed, start new thread with current line and then carry on like normal
semDestroy(blockSemaphore);
blockSemaphore = semCreate(-maxNumberOfThreads); //reset semaphore
if(!blockSemaphore) {
die("semCreate");
}
linesUntilEmptyLine = 0; //resets lines for new block
char *currentLine = strdup(line);
pthread_create(&threadIDs[linesUntilEmptyLine], NULL, &lineThread, currentLine);
linesUntilEmptyLine += 1;
continue;
}
//read individual rows in file and start new thread for each row. also increment lineCounter to indicate already run lines
char *currentLine = strdup(line);
pthread_create(&threadIDs[linesUntilEmptyLine], NULL, &lineThread, currentLine);
linesUntilEmptyLine += 1;
}
//isEOF = 1;
queue_put(NULL, NULL, 0); //indicate to printing task that the file ended and it should terminate (by putting signal into queue, main automatically also waits for queue to exit)
pthread_join(printThreadID, NULL); //wait for printing thread to exit
if(fclose(machFileStream)){
perror("fclose");
//main exits anyway
}
semDestroy(blockSemaphore);
queue_deinit();
return 0;
}