diff --git a/lib/src/core/networking/client.dart b/lib/src/core/networking/client.dart index 60439415..0250b219 100644 --- a/lib/src/core/networking/client.dart +++ b/lib/src/core/networking/client.dart @@ -342,52 +342,30 @@ abstract class OpenAINetworkingClient { required T Function(Map) onSuccess, required Map body, http.Client? client, - }) { - final controller = StreamController(); - + }) async* { try { final clientForUse = client ?? _streamingHttpClient(); - final uri = Uri.parse(to); - final headers = HeadersBuilder.build(); - final httpMethod = OpenAIStrings.postMethod; - final request = http.Request(httpMethod, uri); - request.headers.addAll(headers); - request.body = jsonEncode(body); - Future close() { - return Future.wait([ - if (client == null) Future.delayed(Duration.zero, clientForUse.close), - controller.close(), - ]); - } - OpenAILogger.logStartRequest(to); + try { + final respond = await clientForUse.send(request); - clientForUse - .send(request) - // .timeout( - // OpenAIConfig.requestsTimeOut, - // onTimeout: () { - // throw TimeoutException("Request timed out"); - // }, - // ) - .then( - (respond) { + try { OpenAILogger.startReadStreamResponse(); - final stream = respond.stream .transform(utf8.decoder) .transform(openAIChatStreamLineSplitter); - String respondData = ""; - stream.where((event) => event.isNotEmpty).listen( - (value) { + try { + String respondData = ""; + await for (final value + in stream.where((event) => event.isNotEmpty)) { final data = value; respondData += data; @@ -401,14 +379,10 @@ abstract class OpenAINetworkingClient { final String data = line.substring(6); if (data.contains(OpenAIStrings.streamResponseEnd)) { OpenAILogger.streamResponseDone(); - - return; + break; } - final decoded = jsonDecode(data) as Map; - - controller.add(onSuccess(decoded)); - + yield onSuccess(decoded); continue; } @@ -425,29 +399,24 @@ abstract class OpenAINetworkingClient { final statusCode = respond.statusCode; final exception = RequestFailedException(message, statusCode); - controller.addError(exception); + yield* Stream.error(error); // Error cases sent from openai } } - }, - onDone: () { - close(); - }, - onError: (error, stackTrace) { - controller.addError(error, stackTrace); - }, - ); - }, - onError: (error, stackTrace) { - controller.addError(error, stackTrace); - }, - ).catchError((e) { - controller.addError(e); - }); + } // end of await for + } catch (error, stackTrace) { + yield* Stream.error( + error, stackTrace); // Error cases in handling stream + } + } catch (error, stackTrace) { + yield* Stream.error(error, + stackTrace); // Error cases in decoding stream from response + } + } catch (e) { + yield* Stream.error(e); // Error cases in getting response + } } catch (e) { - controller.addError(e); + yield* Stream.error(e); //Error cases in making request } - - return controller.stream; } static Future imageEditForm({