22 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
33 * SPDX-License-Identifier: Apache-2.0.
44 */
5-
65#pragma once
7-
8- #include < aws/core/Core_EXPORTS.h>
9- #include < aws/core/client/ClientConfiguration.h>
10- #include < aws/core/utils/HashingUtils.h>
11- #include < aws/core/utils/StringUtils.h>
6+ #include < aws/core/http/HttpRequest.h>
127#include < aws/core/utils/Array.h>
8+ #include < aws/core/utils/StringUtils.h>
9+ #include < aws/core/utils/HashingUtils.h>
10+ #include < aws/core/utils/logging/LogMacros.h>
11+ #include < aws/core/utils/memory/stl/AWSStringStream.h>
1312#include < smithy/interceptor/Interceptor.h>
14- #include < smithy/client/common/AwsSmithyClientUtils.h>
13+ #include < aws/core/client/ClientConfiguration.h>
14+ #include < aws/core/utils/Outcome.h>
15+ #include < aws/core/client/AWSError.h>
16+ #include < memory>
1517
1618namespace smithy {
1719namespace client {
1820namespace features {
1921
20- namespace {
21-
22- static const char * const CHECKSUM_HEADER_PREFIX = " x-amz-checksum-" ;
23- static const char * const ALLOCATION_TAG = " ChunkingInterceptor" ;
24- static const size_t DATA_BUFFER_SIZE = 65536 ;
25-
26- } // anonymous namespace
22+ static const size_t AWS_DATA_BUFFER_SIZE = 65536 ;
23+ static const char * ALLOCATION_TAG = " ChunkingInterceptor" ;
24+ static const char * CHECKSUM_HEADER_PREFIX = " x-amz-checksum-" ;
2725
28- class AwsChunkedStreamWrapper {
26+ template <size_t DataBufferSize = AWS_DATA_BUFFER_SIZE>
27+ class AwsChunkedStreamBuf : public std ::streambuf {
2928public:
30- AwsChunkedStreamWrapper (Aws::Http::HttpRequest* request, const std::shared_ptr<Aws::IOStream>& originalBody, size_t bufferSize = DATA_BUFFER_SIZE)
31- : m_streambuf(request, originalBody, bufferSize), m_iostream(&m_streambuf) {}
32-
33- Aws::IOStream* GetIOStream () { return &m_iostream; }
29+ AwsChunkedStreamBuf (Aws::Http::HttpRequest* request,
30+ const std::shared_ptr<Aws::IOStream>& stream,
31+ size_t bufferSize = DataBufferSize)
32+ : m_chunkingStream(Aws::MakeShared<Aws::StringStream>(" AwsChunkedStream" )),
33+ m_request (request),
34+ m_stream(stream),
35+ m_data(bufferSize)
36+ {
37+ assert (m_stream != nullptr );
38+ if (m_stream == nullptr ) {
39+ AWS_LOGSTREAM_ERROR (" AwsChunkedStream" , " stream is null" );
40+ }
41+ assert (m_request != nullptr );
42+ if (m_request == nullptr ) {
43+ AWS_LOGSTREAM_ERROR (" AwsChunkedStream" , " request is null" );
44+ }
3445
35- private:
36- class AwsChunkedStreamBuf : public std ::streambuf {
37- public:
38- AwsChunkedStreamBuf (Aws::Http::HttpRequest* request, const std::shared_ptr<Aws::IOStream>& originalBody, size_t bufferSize)
39- : m_request(request), m_stream(originalBody), m_data(bufferSize), m_bufferSize(bufferSize),
40- m_chunkingStream (Aws::MakeShared<Aws::StringStream>(ALLOCATION_TAG )) {
41- setg ( nullptr , nullptr , nullptr );
46+ setg ( nullptr , nullptr , nullptr );
47+ }
48+
49+ protected:
50+ int_type underflow () override {
51+ if ( gptr () && gptr () < egptr ( )) {
52+ return traits_type::to_int_type (* gptr () );
4253 }
4354
44- protected:
45- int_type underflow () override {
46- if (gptr () < egptr ()) {
47- return traits_type::to_int_type (*gptr ());
55+ // only read and write to chunked stream if the underlying stream
56+ // is still in a valid state
57+ if (m_stream->good ()) {
58+ // Try to read in a 64K chunk, if we cant we know the stream is over
59+ m_stream->read (m_data.GetUnderlyingData (), m_data.GetLength ());
60+ size_t bytesRead = static_cast <size_t >(m_stream->gcount ());
61+ writeChunk (bytesRead);
62+
63+ // if we've read everything from the stream, we want to add the trailer
64+ // to the underlying stream
65+ if ((m_stream->peek () == EOF || m_stream->eof ()) && !m_stream->bad ()) {
66+ writeTrailerToUnderlyingStream ();
4867 }
68+ }
4969
50- if (m_stream->good ()) {
51- m_stream->read (m_data.GetUnderlyingData (), m_bufferSize);
52- size_t bytesRead = static_cast <size_t >(m_stream->gcount ());
53- writeChunk (bytesRead);
54-
55- if ((m_stream->peek () == EOF || m_stream->eof ()) && !m_stream->bad ()) {
56- writeTrailerToUnderlyingStream ();
57- }
58- }
59-
60- if ((m_chunkingStream->peek () == EOF || m_chunkingStream->eof ()) && !m_chunkingStream->bad ()) {
61- return traits_type::eof ();
62- }
63-
64- m_chunkingStream->read (m_buffer, sizeof (m_buffer));
65- size_t bytesRead = static_cast <size_t >(m_chunkingStream->gcount ());
66- if (bytesRead == 0 ) {
67- return traits_type::eof ();
68- }
69-
70- setg (m_buffer, m_buffer, m_buffer + bytesRead);
71- return traits_type::to_int_type (*gptr ());
70+ // if the underlying stream is empty there is nothing to read
71+ if ((m_chunkingStream->peek () == EOF || m_chunkingStream->eof ()) && !m_chunkingStream->bad ()) {
72+ return traits_type::eof ();
7273 }
7374
74- private:
75- void writeTrailerToUnderlyingStream () {
75+ // Read from chunking stream to internal buffer
76+ m_chunkingStream->read (m_buffer.GetUnderlyingData (), m_buffer.GetLength ());
77+ size_t bytesRead = static_cast <size_t >(m_chunkingStream->gcount ());
78+ if (bytesRead == 0 ) {
79+ return traits_type::eof ();
80+ }
81+
82+ setg (m_buffer.GetUnderlyingData (), m_buffer.GetUnderlyingData (), m_buffer.GetUnderlyingData () + bytesRead);
83+ return traits_type::to_int_type (*gptr ());
84+ }
85+
86+ private:
87+ void writeTrailerToUnderlyingStream () {
7688 Aws::StringStream chunkedTrailerStream;
7789 chunkedTrailerStream << " 0\r\n " ;
7890 if (m_request->GetRequestHash ().second != nullptr ) {
@@ -86,13 +98,13 @@ class AwsChunkedStreamWrapper {
8698 }
8799 *m_chunkingStream << chunkedTrailer;
88100 }
89-
101+
90102 void writeChunk (size_t bytesRead) {
91103 if (m_request->GetRequestHash ().second != nullptr ) {
92104 m_request->GetRequestHash ().second ->Update (reinterpret_cast <unsigned char *>(m_data.GetUnderlyingData ()), bytesRead);
93105 }
94-
95- if (bytesRead > 0 && m_chunkingStream != nullptr && !m_chunkingStream->bad ()) {
106+
107+ if (bytesRead > 0 && m_chunkingStream && !m_chunkingStream->bad ()) {
96108 if (m_chunkingStream->eof ()) {
97109 m_chunkingStream->clear ();
98110 }
@@ -102,16 +114,23 @@ class AwsChunkedStreamWrapper {
102114 }
103115 }
104116
105- Aws::Http::HttpRequest* m_request;
106- std::shared_ptr<Aws::IOStream> m_stream;
107- Aws::Utils::Array<char > m_data;
108- size_t m_bufferSize;
109- std::shared_ptr<Aws::IOStream> m_chunkingStream;
110- char m_buffer[8192 ];
111- };
112-
113- AwsChunkedStreamBuf m_streambuf;
114- Aws::IOStream m_iostream;
117+ std::shared_ptr<Aws::IOStream> m_chunkingStream;
118+ Aws::Http::HttpRequest* m_request{nullptr };
119+ std::shared_ptr<Aws::IOStream> m_stream;
120+ Aws::Utils::Array<char > m_data;
121+ Aws::Utils::Array<char > m_buffer{DataBufferSize};
122+ };
123+
124+ class AwsChunkedIOStream : public Aws ::IOStream {
125+ public:
126+ AwsChunkedIOStream (Aws::Http::HttpRequest* request,
127+ const std::shared_ptr<Aws::IOStream>& originalBody,
128+ size_t bufferSize = AWS_DATA_BUFFER_SIZE)
129+ : Aws::IOStream(&m_buf),
130+ m_buf (request, originalBody, bufferSize) {}
131+
132+ private:
133+ AwsChunkedStreamBuf<> m_buf;
115134};
116135
117136/* *
@@ -159,11 +178,9 @@ class ChunkingInterceptor : public smithy::interceptor::Interceptor {
159178 }
160179 }
161180
162- auto wrapper = Aws::MakeShared<AwsChunkedStreamWrapper >(
181+ auto chunkedBody = Aws::MakeShared<AwsChunkedIOStream >(
163182 ALLOCATION_TAG, request.get (), originalBody);
164- auto chunkedBody = std::shared_ptr<Aws::IOStream>(
165- wrapper, wrapper->GetIOStream ());
166-
183+
167184 request->AddContentBody (chunkedBody);
168185 return request;
169186 }
0 commit comments