Commit 00bba7de authored by ukai@chromium.org's avatar ukai@chromium.org

2010-05-10 Fumitoshi Ukai <ukai@chromium.org>

        Reviewed by Alexey Proskuryakov.

        WebSocket needs to suspend/resume as Active DOM object.
        https://bugs.webkit.org/show_bug.cgi?id=38171

        Implement suspend()/resume() in WebSocket and WebSocketChannel.
        While WebSocketChannel is suspended, it only adds received data in m_buffer
        or record the handle was closed, and report no event to WebSocket.
        When resumed, it will process buffer or handle closing.
        Since suspend/resume would be called while processing JavaScript event handler (e.g. before/after alert()), WebSocketChannel method that would fire an event need to be reentrant.
        So, WebSocketChannel::processBuffer() call WebSocket to fire an event at most once and skips buffer before the calling,
        so that next call of processBuffer() would process the next frame.

        * websockets/ThreadableWebSocketChannel.h:
        * websockets/ThreadableWebSocketChannelClientWrapper.h:
        (WebCore::ThreadableWebSocketChannelClientWrapper::didConnect):
          Mark channel is opened, and process pending events if not suspended.
        (WebCore::ThreadableWebSocketChannelClientWrapper::didReceiveMessage):
          Push message back in pending queue, and process pending events if not suspended.
        (WebCore::ThreadableWebSocketChannelClientWrapper::didClose):
          Mark channel is closed, and process pending events if not suspended.
        (WebCore::ThreadableWebSocketChannelClientWrapper::suspend):
          Mark suspended.
        (WebCore::ThreadableWebSocketChannelClientWrapper::resume):
          Unmark suspended, and process pending events.
        (WebCore::ThreadableWebSocketChannelClientWrapper::ThreadableWebSocketChannelClientWrapper):
        (WebCore::ThreadableWebSocketChannelClientWrapper::processPendingEvents):
        * websockets/WebSocket.cpp:
        (WebCore::WebSocket::canSuspend):
        (WebCore::WebSocket::suspend):
        (WebCore::WebSocket::resume):
        (WebCore::WebSocket::didClose):
        * websockets/WebSocket.h:
        * websockets/WebSocketChannel.cpp:
        (WebCore::WebSocketChannel::WebSocketChannel):
        (WebCore::WebSocketChannel::connect):
        (WebCore::WebSocketChannel::send):
        (WebCore::WebSocketChannel::bufferedAmount):
        (WebCore::WebSocketChannel::close):
        (WebCore::WebSocketChannel::suspend):
        (WebCore::WebSocketChannel::resume):
         When resumed, it will process buffer and
         handle closing if handle was already closed while suspended.
        (WebCore::WebSocketChannel::didClose):
         If suspended, record unhandled bufferedAmount and set m_closed true, so that closing will be processed when resumed.
        (WebCore::WebSocketChannel::didReceiveData):
         Add received data in buffer and process buffer while it is not suspended.
        (WebCore::WebSocketChannel::processBuffer):
         Process handshake header or one frame message.
         Return true if there are more data to be processed.
         Return false otherwise (e.g. incomplete handshake header or incomplete frame).
        * websockets/WebSocketChannel.h:
        * websockets/WorkerThreadableWebSocketChannel.cpp:
        (WebCore::WorkerThreadableWebSocketChannel::suspend):
        (WebCore::WorkerThreadableWebSocketChannel::resume):
        (WebCore::WorkerThreadableWebSocketChannel::Peer::suspend):
        (WebCore::WorkerThreadableWebSocketChannel::Peer::resume):
        (WebCore::WorkerThreadableWebSocketChannel::mainThreadSuspend):
        (WebCore::WorkerThreadableWebSocketChannel::Bridge::suspend):
        (WebCore::WorkerThreadableWebSocketChannel::mainThreadResume):
        (WebCore::WorkerThreadableWebSocketChannel::Bridge::resume):
        * websockets/WorkerThreadableWebSocketChannel.h:

git-svn-id: http://svn.webkit.org/repository/webkit/trunk@59116 268f45cc-cd09-0410-ab3c-d52691b4dbfc
parent d12c0884
2010-05-10 Fumitoshi Ukai <ukai@chromium.org>
Reviewed by Alexey Proskuryakov.
WebSocket needs to suspend/resume as Active DOM object.
https://bugs.webkit.org/show_bug.cgi?id=38171
Implement suspend()/resume() in WebSocket and WebSocketChannel.
While WebSocketChannel is suspended, it only adds received data in m_buffer
or record the handle was closed, and report no event to WebSocket.
When resumed, it will process buffer or handle closing.
Since suspend/resume would be called while processing JavaScript event handler (e.g. before/after alert()), WebSocketChannel method that would fire an event need to be reentrant.
So, WebSocketChannel::processBuffer() call WebSocket to fire an event at most once and skips buffer before the calling,
so that next call of processBuffer() would process the next frame.
* websockets/ThreadableWebSocketChannel.h:
* websockets/ThreadableWebSocketChannelClientWrapper.h:
(WebCore::ThreadableWebSocketChannelClientWrapper::didConnect):
Mark channel is opened, and process pending events if not suspended.
(WebCore::ThreadableWebSocketChannelClientWrapper::didReceiveMessage):
Push message back in pending queue, and process pending events if not suspended.
(WebCore::ThreadableWebSocketChannelClientWrapper::didClose):
Mark channel is closed, and process pending events if not suspended.
(WebCore::ThreadableWebSocketChannelClientWrapper::suspend):
Mark suspended.
(WebCore::ThreadableWebSocketChannelClientWrapper::resume):
Unmark suspended, and process pending events.
(WebCore::ThreadableWebSocketChannelClientWrapper::ThreadableWebSocketChannelClientWrapper):
(WebCore::ThreadableWebSocketChannelClientWrapper::processPendingEvents):
* websockets/WebSocket.cpp:
(WebCore::WebSocket::canSuspend):
(WebCore::WebSocket::suspend):
(WebCore::WebSocket::resume):
(WebCore::WebSocket::didClose):
* websockets/WebSocket.h:
* websockets/WebSocketChannel.cpp:
(WebCore::WebSocketChannel::WebSocketChannel):
(WebCore::WebSocketChannel::connect):
(WebCore::WebSocketChannel::send):
(WebCore::WebSocketChannel::bufferedAmount):
(WebCore::WebSocketChannel::close):
(WebCore::WebSocketChannel::suspend):
(WebCore::WebSocketChannel::resume):
When resumed, it will process buffer and
handle closing if handle was already closed while suspended.
(WebCore::WebSocketChannel::didClose):
If suspended, record unhandled bufferedAmount and set m_closed true, so that closing will be processed when resumed.
(WebCore::WebSocketChannel::didReceiveData):
Add received data in buffer and process buffer while it is not suspended.
(WebCore::WebSocketChannel::processBuffer):
Process handshake header or one frame message.
Return true if there are more data to be processed.
Return false otherwise (e.g. incomplete handshake header or incomplete frame).
* websockets/WebSocketChannel.h:
* websockets/WorkerThreadableWebSocketChannel.cpp:
(WebCore::WorkerThreadableWebSocketChannel::suspend):
(WebCore::WorkerThreadableWebSocketChannel::resume):
(WebCore::WorkerThreadableWebSocketChannel::Peer::suspend):
(WebCore::WorkerThreadableWebSocketChannel::Peer::resume):
(WebCore::WorkerThreadableWebSocketChannel::mainThreadSuspend):
(WebCore::WorkerThreadableWebSocketChannel::Bridge::suspend):
(WebCore::WorkerThreadableWebSocketChannel::mainThreadResume):
(WebCore::WorkerThreadableWebSocketChannel::Bridge::resume):
* websockets/WorkerThreadableWebSocketChannel.h:
2010-05-07 Dumitru Daniliuc <dumi@chromium.org>
Reviewed by Brady Eidson.
......@@ -53,6 +53,9 @@ public:
virtual void close() = 0;
virtual void disconnect() = 0; // Will suppress didClose().
virtual void suspend() = 0;
virtual void resume() = 0;
void ref() { refThreadableWebSocketChannel(); }
void deref() { derefThreadableWebSocketChannel(); }
......
......@@ -33,9 +33,11 @@
#if ENABLE(WEB_SOCKETS)
#include "PlatformString.h"
#include "WebSocketChannelClient.h"
#include <wtf/PassRefPtr.h>
#include <wtf/Threading.h>
#include <wtf/Vector.h>
namespace WebCore {
......@@ -89,20 +91,35 @@ public:
void didConnect()
{
if (m_client)
m_client->didConnect();
m_pendingConnected = true;
if (!m_suspended)
processPendingEvents();
}
void didReceiveMessage(const String& msg)
{
if (m_client)
m_client->didReceiveMessage(msg);
m_pendingMessages.append(msg);
if (!m_suspended)
processPendingEvents();
}
void didClose(unsigned long unhandledBufferedAmount)
{
if (m_client)
m_client->didClose(unhandledBufferedAmount);
m_pendingClosed = true;
m_bufferedAmount = unhandledBufferedAmount;
if (!m_suspended)
processPendingEvents();
}
void suspend()
{
m_suspended = true;
}
void resume()
{
m_suspended = false;
processPendingEvents();
}
protected:
......@@ -111,13 +128,43 @@ protected:
, m_syncMethodDone(false)
, m_sent(false)
, m_bufferedAmount(0)
, m_suspended(false)
, m_pendingConnected(false)
, m_pendingClosed(false)
{
}
void processPendingEvents()
{
ASSERT(!m_suspended);
if (m_pendingConnected) {
m_pendingConnected = false;
if (m_client)
m_client->didConnect();
}
Vector<String> messages;
messages.swap(m_pendingMessages);
for (Vector<String>::const_iterator iter = messages.begin(); iter != messages.end(); ++iter) {
if (m_client)
m_client->didReceiveMessage(*iter);
}
if (m_pendingClosed) {
m_pendingClosed = false;
if (m_client)
m_client->didClose(m_bufferedAmount);
}
}
WebSocketChannelClient* m_client;
bool m_syncMethodDone;
bool m_sent;
unsigned long m_bufferedAmount;
bool m_suspended;
bool m_pendingConnected;
Vector<String> m_pendingMessages;
bool m_pendingClosed;
};
} // namespace WebCore
......
......@@ -212,6 +212,23 @@ void WebSocket::contextDestroyed()
ActiveDOMObject::contextDestroyed();
}
bool WebSocket::canSuspend() const
{
return !m_channel;
}
void WebSocket::suspend()
{
if (m_channel)
m_channel->suspend();
}
void WebSocket::resume()
{
if (m_channel)
m_channel->resume();
}
void WebSocket::stop()
{
bool pending = hasPendingActivity();
......@@ -259,6 +276,8 @@ void WebSocket::didReceiveMessageError()
void WebSocket::didClose(unsigned long unhandledBufferedAmount)
{
LOG(Network, "WebSocket %p didClose", this);
if (!m_channel)
return;
m_state = CLOSED;
m_bufferedAmountAfterClose += unhandledBufferedAmount;
ASSERT(scriptExecutionContext());
......
......@@ -84,6 +84,9 @@ namespace WebCore {
virtual ScriptExecutionContext* scriptExecutionContext() const;
virtual void contextDestroyed();
virtual bool canSuspend() const;
virtual void suspend();
virtual void resume();
virtual void stop();
using RefCounted<WebSocket>::ref;
......
......@@ -58,6 +58,9 @@ WebSocketChannel::WebSocketChannel(ScriptExecutionContext* context, WebSocketCha
, m_handshake(url, protocol, context)
, m_buffer(0)
, m_bufferSize(0)
, m_suspended(false)
, m_closed(false)
, m_unhandledBufferedAmount(0)
{
}
......@@ -70,6 +73,7 @@ void WebSocketChannel::connect()
{
LOG(Network, "WebSocketChannel %p connect", this);
ASSERT(!m_handle);
ASSERT(!m_suspended);
m_handshake.reset();
ref();
m_handle = SocketStreamHandle::create(m_handshake.url(), this);
......@@ -79,6 +83,7 @@ bool WebSocketChannel::send(const String& msg)
{
LOG(Network, "WebSocketChannel %p send %s", this, msg.utf8().data());
ASSERT(m_handle);
ASSERT(!m_suspended);
Vector<char> buf;
buf.append('\0'); // frame type
buf.append(msg.utf8().data(), msg.utf8().length());
......@@ -90,12 +95,14 @@ unsigned long WebSocketChannel::bufferedAmount() const
{
LOG(Network, "WebSocketChannel %p bufferedAmount", this);
ASSERT(m_handle);
ASSERT(!m_suspended);
return m_handle->bufferedAmount();
}
void WebSocketChannel::close()
{
LOG(Network, "WebSocketChannel %p close", this);
ASSERT(!m_suspended);
if (m_handle)
m_handle->close(); // will call didClose()
}
......@@ -110,6 +117,22 @@ void WebSocketChannel::disconnect()
m_handle->close();
}
void WebSocketChannel::suspend()
{
m_suspended = true;
}
void WebSocketChannel::resume()
{
m_suspended = false;
RefPtr<WebSocketChannel> protect(this); // The client can close the channel, potentially removing the last reference.
while (!m_suspended && m_client && m_buffer)
if (!processBuffer())
break;
if (!m_suspended && m_client && m_closed && m_handle)
didClose(m_handle.get());
}
void WebSocketChannel::didOpen(SocketStreamHandle* handle)
{
LOG(Network, "WebSocketChannel %p didOpen", this);
......@@ -127,14 +150,17 @@ void WebSocketChannel::didClose(SocketStreamHandle* handle)
{
LOG(Network, "WebSocketChannel %p didClose", this);
ASSERT_UNUSED(handle, handle == m_handle || !m_handle);
m_closed = true;
if (m_handle) {
unsigned long unhandledBufferedAmount = m_handle->bufferedAmount();
m_unhandledBufferedAmount = m_handle->bufferedAmount();
if (m_suspended)
return;
WebSocketChannelClient* client = m_client;
m_client = 0;
m_context = 0;
m_handle = 0;
if (client)
client->didClose(unhandledBufferedAmount);
client->didClose(m_unhandledBufferedAmount);
}
deref();
}
......@@ -155,87 +181,9 @@ void WebSocketChannel::didReceiveData(SocketStreamHandle* handle, const char* da
handle->close();
return;
}
if (m_handshake.mode() == WebSocketHandshake::Incomplete) {
int headerLength = m_handshake.readServerHandshake(m_buffer, m_bufferSize);
if (headerLength <= 0)
return;
switch (m_handshake.mode()) {
case WebSocketHandshake::Connected:
if (!m_handshake.serverSetCookie().isEmpty()) {
if (m_context->isDocument()) {
Document* document = static_cast<Document*>(m_context);
if (cookiesEnabled(document)) {
ExceptionCode ec; // Exception (for sandboxed documents) ignored.
document->setCookie(m_handshake.serverSetCookie(), ec);
}
}
}
// FIXME: handle set-cookie2.
LOG(Network, "WebSocketChannel %p connected", this);
m_client->didConnect();
if (!m_client)
return;
while (!m_suspended && m_client && m_buffer)
if (!processBuffer())
break;
default:
LOG(Network, "WebSocketChannel %p connection failed", this);
handle->close();
return;
}
skipBuffer(headerLength);
if (!m_buffer)
return;
LOG(Network, "remaining in read buf %ul", m_bufferSize);
}
if (m_handshake.mode() != WebSocketHandshake::Connected)
return;
const char* nextFrame = m_buffer;
const char* p = m_buffer;
const char* end = p + m_bufferSize;
while (p < end) {
unsigned char frameByte = static_cast<unsigned char>(*p++);
if ((frameByte & 0x80) == 0x80) {
int length = 0;
while (p < end) {
if (length > std::numeric_limits<int>::max() / 128) {
LOG(Network, "frame length overflow %d", length);
m_client->didReceiveMessageError();
if (!m_client)
return;
handle->close();
return;
}
char msgByte = *p;
length = length * 128 + (msgByte & 0x7f);
++p;
if (!(msgByte & 0x80))
break;
}
if (p + length < end) {
p += length;
nextFrame = p;
m_client->didReceiveMessageError();
if (!m_client)
return;
} else
break;
} else {
const char* msgStart = p;
while (p < end && *p != '\xff')
++p;
if (p < end && *p == '\xff') {
if (frameByte == 0x00)
m_client->didReceiveMessage(String::fromUTF8(msgStart, p - msgStart));
else
m_client->didReceiveMessageError();
if (!m_client)
return;
++p;
nextFrame = p;
}
}
}
skipBuffer(nextFrame - m_buffer);
}
void WebSocketChannel::didFail(SocketStreamHandle* handle, const SocketStreamError&)
......@@ -281,6 +229,95 @@ void WebSocketChannel::skipBuffer(int len)
memmove(m_buffer, m_buffer + len, m_bufferSize);
}
bool WebSocketChannel::processBuffer()
{
ASSERT(!m_suspended);
ASSERT(m_client);
ASSERT(m_buffer);
if (m_handshake.mode() == WebSocketHandshake::Incomplete) {
int headerLength = m_handshake.readServerHandshake(m_buffer, m_bufferSize);
if (headerLength <= 0)
return false;
if (m_handshake.mode() == WebSocketHandshake::Connected) {
if (!m_handshake.serverSetCookie().isEmpty()) {
if (m_context->isDocument()) {
Document* document = static_cast<Document*>(m_context);
if (cookiesEnabled(document)) {
ExceptionCode ec; // Exception (for sandboxed documents) ignored.
document->setCookie(m_handshake.serverSetCookie(), ec);
}
}
}
// FIXME: handle set-cookie2.
LOG(Network, "WebSocketChannel %p connected", this);
skipBuffer(headerLength);
m_client->didConnect();
LOG(Network, "remaining in read buf %ul", m_bufferSize);
return m_buffer;
}
LOG(Network, "WebSocketChannel %p connection failed", this);
skipBuffer(headerLength);
if (!m_closed)
m_handle->close();
return false;
}
if (m_handshake.mode() != WebSocketHandshake::Connected)
return false;
const char* nextFrame = m_buffer;
const char* p = m_buffer;
const char* end = p + m_bufferSize;
unsigned char frameByte = static_cast<unsigned char>(*p++);
if ((frameByte & 0x80) == 0x80) {
int length = 0;
while (p < end) {
if (length > std::numeric_limits<int>::max() / 128) {
LOG(Network, "frame length overflow %d", length);
skipBuffer(p + length - m_buffer);
m_client->didReceiveMessageError();
if (!m_client)
return false;
if (!m_closed)
m_handle->close();
return false;
}
char msgByte = *p;
length = length * 128 + (msgByte & 0x7f);
++p;
if (!(msgByte & 0x80))
break;
}
if (p + length < end) {
p += length;
nextFrame = p;
skipBuffer(nextFrame - m_buffer);
m_client->didReceiveMessageError();
return m_buffer;
}
return false;
}
const char* msgStart = p;
while (p < end && *p != '\xff')
++p;
if (p < end && *p == '\xff') {
int msgLength = p - msgStart;
++p;
nextFrame = p;
if (frameByte == 0x00) {
String msg = String::fromUTF8(msgStart, msgLength);
skipBuffer(nextFrame - m_buffer);
m_client->didReceiveMessage(msg);
} else {
skipBuffer(nextFrame - m_buffer);
m_client->didReceiveMessageError();
}
return m_buffer;
}
return false;
}
} // namespace WebCore
#endif // ENABLE(WEB_SOCKETS)
......@@ -35,8 +35,10 @@
#include "SocketStreamHandleClient.h"
#include "ThreadableWebSocketChannel.h"
#include "Timer.h"
#include "WebSocketHandshake.h"
#include <wtf/RefCounted.h>
#include <wtf/Vector.h>
namespace WebCore {
......@@ -57,6 +59,9 @@ namespace WebCore {
virtual void close();
virtual void disconnect(); // Will suppress didClose().
virtual void suspend();
virtual void resume();
virtual void didOpen(SocketStreamHandle*);
virtual void didClose(SocketStreamHandle*);
virtual void didReceiveData(SocketStreamHandle*, const char*, int);
......@@ -76,6 +81,7 @@ namespace WebCore {
bool appendToBuffer(const char* data, int len);
void skipBuffer(int len);
bool processBuffer();
ScriptExecutionContext* m_context;
WebSocketChannelClient* m_client;
......@@ -83,6 +89,10 @@ namespace WebCore {
RefPtr<SocketStreamHandle> m_handle;
char* m_buffer;
int m_bufferSize;
bool m_suspended;
bool m_closed;
unsigned long m_unhandledBufferedAmount;
};
} // namespace WebCore
......
......@@ -94,6 +94,20 @@ void WorkerThreadableWebSocketChannel::disconnect()
m_bridge.clear();
}
void WorkerThreadableWebSocketChannel::suspend()
{
m_workerClientWrapper->suspend();
if (m_bridge)
m_bridge->suspend();
}
void WorkerThreadableWebSocketChannel::resume()
{
m_workerClientWrapper->resume();
if (m_bridge)
m_bridge->resume();
}
WorkerThreadableWebSocketChannel::Peer::Peer(RefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, WorkerLoaderProxy& loaderProxy, ScriptExecutionContext* context, const String& taskMode, const KURL& url, const String& protocol)
: m_workerClientWrapper(clientWrapper)
, m_loaderProxy(loaderProxy)
......@@ -166,6 +180,22 @@ void WorkerThreadableWebSocketChannel::Peer::disconnect()
m_mainWebSocketChannel = 0;
}
void WorkerThreadableWebSocketChannel::Peer::suspend()
{
ASSERT(isMainThread());
if (!m_mainWebSocketChannel)
return;
m_mainWebSocketChannel->suspend();
}
void WorkerThreadableWebSocketChannel::Peer::resume()
{
ASSERT(isMainThread());
if (!m_mainWebSocketChannel)
return;
m_mainWebSocketChannel->resume();
}
static void workerContextDidConnect(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
{
ASSERT_UNUSED(context, context->isWorkerContext());
......@@ -333,6 +363,36 @@ void WorkerThreadableWebSocketChannel::Bridge::disconnect()
m_workerContext = 0;
}
void WorkerThreadableWebSocketChannel::mainThreadSuspend(ScriptExecutionContext* context, Peer* peer)
{
ASSERT(isMainThread());
ASSERT_UNUSED(context, context->isDocument());
ASSERT(peer);
peer->suspend();
}
void WorkerThreadableWebSocketChannel::Bridge::suspend()
{
ASSERT(m_peer);
m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSuspend, m_peer));
}
void WorkerThreadableWebSocketChannel::mainThreadResume(ScriptExecutionContext* context, Peer* peer)
{
ASSERT(isMainThread());
ASSERT_UNUSED(context, context->isDocument());
ASSERT(peer);
peer->resume();
}
void WorkerThreadableWebSocketChannel::Bridge::resume()
{
ASSERT(m_peer);
m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadResume, m_peer));
}
void WorkerThreadableWebSocketChannel::Bridge::clearClientWrapper()
{
m_workerClientWrapper->clearClient();
......
......@@ -64,6 +64,8 @@ public:
virtual unsigned long bufferedAmount() const;
virtual void close();
virtual void disconnect(); // Will suppress didClose().
virtual void suspend();
virtual void resume();
using RefCounted<WorkerThreadableWebSocketChannel>::ref;
using RefCounted<WorkerThreadableWebSocketChannel>::deref;
......@@ -88,6 +90,8 @@ private:
void bufferedAmount();
void close();
void disconnect();
void suspend();
void resume();
virtual void didConnect();
virtual void didReceiveMessage(const String& message);
......@@ -115,6 +119,8 @@ private:
unsigned long bufferedAmount();
void close();
void disconnect();
void suspend();
void resume();
using RefCounted<Bridge>::ref;