Commit 36588dcc authored by commit-queue@webkit.org's avatar commit-queue@webkit.org
Browse files

[WebSocket] Receiving a large message is really slow

https://bugs.webkit.org/show_bug.cgi?id=97237

Patch by Evan Wallace <evan.exe@gmail.com> on 2012-09-21
Reviewed by Alexey Proskuryakov.

WebSocketChannel always reallocates its internal buffer when it receives
and appends new data which causes dramatic slowdowns for messages over
2 MB in size. This patch changes the internal buffer of WebSocketChannel
from a raw char array to a Vector<char> and uses its amortized append()
method. This brings the time to receive a 5 MB message from 5.2 seconds
to 0.25 seconds.

This patch is only for optimization. No new tests are needed.

* Modules/websockets/WebSocketChannel.cpp:
(WebCore::WebSocketChannel::WebSocketChannel):
(WebCore::WebSocketChannel::~WebSocketChannel):
(WebCore::WebSocketChannel::fail):
(WebCore::WebSocketChannel::resume):
(WebCore::WebSocketChannel::didReceiveSocketStreamData):
(WebCore::WebSocketChannel::appendToBuffer):
(WebCore::WebSocketChannel::skipBuffer):
(WebCore::WebSocketChannel::processBuffer):
(WebCore::WebSocketChannel::resumeTimerFired):
(WebCore::WebSocketChannel::processFrame):
* Modules/websockets/WebSocketChannel.h:

git-svn-id: http://svn.webkit.org/repository/webkit/trunk@129239 268f45cc-cd09-0410-ab3c-d52691b4dbfc
parent b99a79f9
2012-09-21 Evan Wallace <evan.exe@gmail.com>
[WebSocket] Receiving a large message is really slow
https://bugs.webkit.org/show_bug.cgi?id=97237
Reviewed by Alexey Proskuryakov.
WebSocketChannel always reallocates its internal buffer when it receives
and appends new data which causes dramatic slowdowns for messages over
2 MB in size. This patch changes the internal buffer of WebSocketChannel
from a raw char array to a Vector<char> and uses its amortized append()
method. This brings the time to receive a 5 MB message from 5.2 seconds
to 0.25 seconds.
This patch is only for optimization. No new tests are needed.
* Modules/websockets/WebSocketChannel.cpp:
(WebCore::WebSocketChannel::WebSocketChannel):
(WebCore::WebSocketChannel::~WebSocketChannel):
(WebCore::WebSocketChannel::fail):
(WebCore::WebSocketChannel::resume):
(WebCore::WebSocketChannel::didReceiveSocketStreamData):
(WebCore::WebSocketChannel::appendToBuffer):
(WebCore::WebSocketChannel::skipBuffer):
(WebCore::WebSocketChannel::processBuffer):
(WebCore::WebSocketChannel::resumeTimerFired):
(WebCore::WebSocketChannel::processFrame):
* Modules/websockets/WebSocketChannel.h:
2012-09-21 Andrey Adaikin <aandrey@chromium.org>
 
Fix build with ENABLE_WEBGL=false
......@@ -73,8 +73,6 @@ const double TCPMaximumSegmentLifetime = 2 * 60.0;
WebSocketChannel::WebSocketChannel(Document* document, WebSocketChannelClient* client)
: m_document(document)
, m_client(client)
, m_buffer(0)
, m_bufferSize(0)
, m_resumeTimer(this, &WebSocketChannel::resumeTimerFired)
, m_suspended(false)
, m_closing(false)
......@@ -97,7 +95,6 @@ WebSocketChannel::WebSocketChannel(Document* document, WebSocketChannelClient* c
WebSocketChannel::~WebSocketChannel()
{
fastFree(m_buffer);
}
void WebSocketChannel::connect(const KURL& url, const String& protocol)
......@@ -206,8 +203,8 @@ void WebSocketChannel::fail(const String& reason)
// once the WebSocket connection is failed (section 7.1.7).
RefPtr<WebSocketChannel> protect(this); // The client can close the channel, potentially removing the last reference.
m_shouldDiscardReceivedData = true;
if (m_buffer)
skipBuffer(m_bufferSize); // Save memory.
if (!m_buffer.isEmpty())
skipBuffer(m_buffer.size()); // Save memory.
m_deflateFramer.didFail();
m_hasContinuousFrame = false;
m_continuousFrameData.clear();
......@@ -238,7 +235,7 @@ void WebSocketChannel::suspend()
void WebSocketChannel::resume()
{
m_suspended = false;
if ((m_buffer || m_closed) && m_client && !m_resumeTimer.isActive())
if ((!m_buffer.isEmpty() || m_closed) && m_client && !m_resumeTimer.isActive())
m_resumeTimer.startOneShot(0);
}
......@@ -312,7 +309,7 @@ void WebSocketChannel::didReceiveSocketStreamData(SocketStreamHandle* handle, co
fail("Ran out of memory while receiving WebSocket data.");
return;
}
while (!m_suspended && m_client && m_buffer)
while (!m_suspended && m_client && !m_buffer.isEmpty())
if (!processBuffer())
break;
}
......@@ -392,55 +389,41 @@ void WebSocketChannel::didFail(int errorCode)
bool WebSocketChannel::appendToBuffer(const char* data, size_t len)
{
size_t newBufferSize = m_bufferSize + len;
if (newBufferSize < m_bufferSize) {
LOG(Network, "WebSocket buffer overflow (%lu+%lu)", static_cast<unsigned long>(m_bufferSize), static_cast<unsigned long>(len));
size_t newBufferSize = m_buffer.size() + len;
if (newBufferSize < m_buffer.size()) {
LOG(Network, "WebSocket buffer overflow (%lu+%lu)", static_cast<unsigned long>(m_buffer.size()), static_cast<unsigned long>(len));
return false;
}
char* newBuffer = 0;
if (!tryFastMalloc(newBufferSize).getValue(newBuffer))
return false;
if (m_buffer)
memcpy(newBuffer, m_buffer, m_bufferSize);
memcpy(newBuffer + m_bufferSize, data, len);
fastFree(m_buffer);
m_buffer = newBuffer;
m_bufferSize = newBufferSize;
m_buffer.append(data, len);
return true;
}
void WebSocketChannel::skipBuffer(size_t len)
{
ASSERT(len <= m_bufferSize);
m_bufferSize -= len;
if (!m_bufferSize) {
fastFree(m_buffer);
m_buffer = 0;
return;
}
memmove(m_buffer, m_buffer + len, m_bufferSize);
ASSERT(len <= m_buffer.size());
memmove(m_buffer.data(), m_buffer.data() + len, m_buffer.size() - len);
m_buffer.resize(m_buffer.size() - len);
}
bool WebSocketChannel::processBuffer()
{
ASSERT(!m_suspended);
ASSERT(m_client);
ASSERT(m_buffer);
LOG(Network, "WebSocketChannel %p processBuffer %lu", this, static_cast<unsigned long>(m_bufferSize));
ASSERT(!m_buffer.isEmpty());
LOG(Network, "WebSocketChannel %p processBuffer %lu", this, static_cast<unsigned long>(m_buffer.size()));
if (m_shouldDiscardReceivedData)
return false;
if (m_receivedClosingHandshake) {
skipBuffer(m_bufferSize);
skipBuffer(m_buffer.size());
return false;
}
RefPtr<WebSocketChannel> protect(this); // The client can close the channel, potentially removing the last reference.
if (m_handshake->mode() == WebSocketHandshake::Incomplete) {
int headerLength = m_handshake->readServerHandshake(m_buffer, m_bufferSize);
int headerLength = m_handshake->readServerHandshake(m_buffer.data(), m_buffer.size());
if (headerLength <= 0)
return false;
if (m_handshake->mode() == WebSocketHandshake::Connected) {
......@@ -456,8 +439,8 @@ bool WebSocketChannel::processBuffer()
LOG(Network, "WebSocketChannel %p connected", this);
skipBuffer(headerLength);
m_client->didConnect();
LOG(Network, "remaining in read buf %lu", static_cast<unsigned long>(m_bufferSize));
return m_buffer;
LOG(Network, "remaining in read buf %lu", static_cast<unsigned long>(m_buffer.size()));
return !m_buffer.isEmpty();
}
ASSERT(m_handshake->mode() == WebSocketHandshake::Failed);
LOG(Network, "WebSocketChannel %p connection failed", this);
......@@ -477,7 +460,7 @@ void WebSocketChannel::resumeTimerFired(Timer<WebSocketChannel>* timer)
ASSERT_UNUSED(timer, timer == &m_resumeTimer);
RefPtr<WebSocketChannel> protect(this); // The client can close the channel, potentially removing the last reference.
while (!m_suspended && m_client && m_buffer)
while (!m_suspended && m_client && !m_buffer.isEmpty())
if (!processBuffer())
break;
if (!m_suspended && m_client && m_closed && m_handle)
......@@ -517,12 +500,12 @@ void WebSocketChannel::closingTimerFired(Timer<WebSocketChannel>* timer)
bool WebSocketChannel::processFrame()
{
ASSERT(m_buffer);
ASSERT(!m_buffer.isEmpty());
WebSocketFrame frame;
const char* frameEnd;
String errorString;
WebSocketFrame::ParseFrameResult result = WebSocketFrame::parseFrame(m_buffer, m_bufferSize, frame, frameEnd, errorString);
WebSocketFrame::ParseFrameResult result = WebSocketFrame::parseFrame(m_buffer.data(), m_buffer.size(), frame, frameEnd, errorString);
if (result == WebSocketFrame::FrameIncomplete)
return false;
if (result == WebSocketFrame::FrameError) {
......@@ -530,8 +513,8 @@ bool WebSocketChannel::processFrame()
return false;
}
ASSERT(m_buffer < frameEnd);
ASSERT(frameEnd <= m_buffer + m_bufferSize);
ASSERT(m_buffer.data() < frameEnd);
ASSERT(frameEnd <= m_buffer.data() + m_buffer.size());
OwnPtr<InflateResultHolder> inflateResult = m_deflateFramer.inflate(frame);
if (!inflateResult->succeeded()) {
......@@ -585,7 +568,7 @@ bool WebSocketChannel::processFrame()
return false;
}
m_continuousFrameData.append(frame.payload, frame.payloadLength);
skipBuffer(frameEnd - m_buffer);
skipBuffer(frameEnd - m_buffer.data());
if (frame.final) {
// onmessage handler may eventually call the other methods of this channel,
// so we should pretend that we have finished to read this frame and
......@@ -617,7 +600,7 @@ bool WebSocketChannel::processFrame()
message = String::fromUTF8(frame.payload, frame.payloadLength);
else
message = "";
skipBuffer(frameEnd - m_buffer);
skipBuffer(frameEnd - m_buffer.data());
if (message.isNull())
fail("Could not decode a text frame as UTF-8.");
else
......@@ -627,7 +610,7 @@ bool WebSocketChannel::processFrame()
m_continuousFrameOpCode = WebSocketFrame::OpCodeText;
ASSERT(m_continuousFrameData.isEmpty());
m_continuousFrameData.append(frame.payload, frame.payloadLength);
skipBuffer(frameEnd - m_buffer);
skipBuffer(frameEnd - m_buffer.data());
}
break;
......@@ -635,14 +618,14 @@ bool WebSocketChannel::processFrame()
if (frame.final) {
OwnPtr<Vector<char> > binaryData = adoptPtr(new Vector<char>(frame.payloadLength));
memcpy(binaryData->data(), frame.payload, frame.payloadLength);
skipBuffer(frameEnd - m_buffer);
skipBuffer(frameEnd - m_buffer.data());
m_client->didReceiveBinaryData(binaryData.release());
} else {
m_hasContinuousFrame = true;
m_continuousFrameOpCode = WebSocketFrame::OpCodeBinary;
ASSERT(m_continuousFrameData.isEmpty());
m_continuousFrameData.append(frame.payload, frame.payloadLength);
skipBuffer(frameEnd - m_buffer);
skipBuffer(frameEnd - m_buffer.data());
}
break;
......@@ -667,7 +650,7 @@ bool WebSocketChannel::processFrame()
m_closeEventReason = String::fromUTF8(&frame.payload[2], frame.payloadLength - 2);
else
m_closeEventReason = "";
skipBuffer(frameEnd - m_buffer);
skipBuffer(frameEnd - m_buffer.data());
m_receivedClosingHandshake = true;
startClosingHandshake(m_closeEventCode, m_closeEventReason);
if (m_closing) {
......@@ -678,22 +661,22 @@ bool WebSocketChannel::processFrame()
case WebSocketFrame::OpCodePing:
enqueueRawFrame(WebSocketFrame::OpCodePong, frame.payload, frame.payloadLength);
skipBuffer(frameEnd - m_buffer);
skipBuffer(frameEnd - m_buffer.data());
break;
case WebSocketFrame::OpCodePong:
// A server may send a pong in response to our ping, or an unsolicited pong which is not associated with
// any specific ping. Either way, there's nothing to do on receipt of pong.
skipBuffer(frameEnd - m_buffer);
skipBuffer(frameEnd - m_buffer.data());
break;
default:
ASSERT_NOT_REACHED();
skipBuffer(frameEnd - m_buffer);
skipBuffer(frameEnd - m_buffer.data());
break;
}
return m_buffer;
return !m_buffer.isEmpty();
}
void WebSocketChannel::enqueueTextFrame(const CString& string)
......
......@@ -195,8 +195,7 @@ private:
WebSocketChannelClient* m_client;
OwnPtr<WebSocketHandshake> m_handshake;
RefPtr<SocketStreamHandle> m_handle;
char* m_buffer;
size_t m_bufferSize;
Vector<char> m_buffer;
Timer<WebSocketChannel> m_resumeTimer;
bool m_suspended;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment