diff --git a/PackageScript b/PackageScript index f6ba884..44d7480 100644 --- a/PackageScript +++ b/PackageScript @@ -1,4 +1,4 @@ -# vim: set ts=8 sts=2 sw=2 tw=99 et ft=python: +# vim: set ts=8 sts=2 sw=2 tw=99 et ft=python: import os builder.SetBuildFolder('package') @@ -27,4 +27,4 @@ def CopyFiles(src, dest, files): source_path = os.path.join(builder.sourcePath, src, source_file) builder.AddCopy(source_path, dest_entry) -CopyFiles('pawn', 'addons/sourcemod/scripting/include', ['async_socket.inc']) \ No newline at end of file +CopyFiles('pawn', 'addons/sourcemod/scripting/include', ['AsyncSocket.inc']) \ No newline at end of file diff --git a/extension/AMBuilder b/extension/AMBuilder index 8e19f18..20d83d5 100644 --- a/extension/AMBuilder +++ b/extension/AMBuilder @@ -1,7 +1,7 @@ # vim: set sts=2 ts=8 sw=2 tw=99 et ft=python: import os -binary = builder.compiler.Library('async_socket.ext') +binary = builder.compiler.Library('AsyncSocket.ext') AsyncSocket.ConfigureForExtension(builder, binary.compiler) binary.compiler.includes += [ diff --git a/extension/context.cpp b/extension/context.cpp index 8b739e2..ebad229 100644 --- a/extension/context.cpp +++ b/extension/context.cpp @@ -1,11 +1,20 @@ +#include "extension.h" #include "context.h" CAsyncSocketContext::CAsyncSocketContext(IPluginContext *pContext) { - this->m_pContext = pContext; + m_pContext = pContext; - socket = NULL; - stream = NULL; + m_pHost = NULL; + m_Port = -1; + + m_Deleted = false; + m_PendingCallback = false; + m_Pending = false; + m_Server = false; + + m_pSocket = NULL; + m_pStream = NULL; m_pConnectCallback = NULL; m_pErrorCallback = NULL; @@ -14,8 +23,8 @@ CAsyncSocketContext::CAsyncSocketContext(IPluginContext *pContext) CAsyncSocketContext::~CAsyncSocketContext() { - if(socket != NULL) - uv_close((uv_handle_t *)socket, NULL); + if(m_pHost) + free(m_pHost); if(m_pConnectCallback) forwards->ReleaseForward(m_pConnectCallback); @@ -25,19 +34,35 @@ CAsyncSocketContext::~CAsyncSocketContext() if(m_pDataCallback) forwards->ReleaseForward(m_pDataCallback); + + m_Deleted = true; } +// Client void CAsyncSocketContext::Connected() { + m_PendingCallback = false; if(!m_pConnectCallback) return; m_pConnectCallback->PushCell(m_Handle); - m_pConnectCallback->Execute(NULL); + m_pConnectCallback->Execute(NULL); +} + +// Server +void CAsyncSocketContext::OnConnect(CAsyncSocketContext *pSocketContext) +{ + m_PendingCallback = false; + if(!m_pConnectCallback) + return; + + m_pConnectCallback->PushCell(pSocketContext->m_Handle); + m_pConnectCallback->Execute(NULL); } void CAsyncSocketContext::OnError(int error) { + m_PendingCallback = false; if(!m_pErrorCallback) return; @@ -49,13 +74,14 @@ void CAsyncSocketContext::OnError(int error) void CAsyncSocketContext::OnData(char* data, ssize_t size) { + m_PendingCallback = false; if(!m_pDataCallback) return; m_pDataCallback->PushCell(m_Handle); m_pDataCallback->PushString(data); m_pDataCallback->PushCell(size); - m_pDataCallback->Execute(NULL); + m_pDataCallback->Execute(NULL); } bool CAsyncSocketContext::SetConnectCallback(funcid_t function) diff --git a/extension/context.h b/extension/context.h index b8efd60..bb827d5 100644 --- a/extension/context.h +++ b/extension/context.h @@ -12,6 +12,11 @@ public: IPluginContext *m_pContext; Handle_t m_Handle; + bool m_Deleted; + bool m_PendingCallback; + bool m_Pending; + bool m_Server; + char *m_pHost; int m_Port; @@ -19,14 +24,15 @@ public: IChangeableForward *m_pErrorCallback; IChangeableForward *m_pDataCallback; - uv_getaddrinfo_t resolver; - uv_tcp_t *socket; - uv_stream_t *stream; + uv_getaddrinfo_t m_Resolver; + uv_tcp_t *m_pSocket; + uv_stream_t *m_pStream; CAsyncSocketContext(IPluginContext *plugin); ~CAsyncSocketContext(); void Connected(); + void OnConnect(CAsyncSocketContext *pSocketContext); void OnError(int error); diff --git a/extension/extension.cpp b/extension/extension.cpp index 96d3dc8..19222f5 100644 --- a/extension/extension.cpp +++ b/extension/extension.cpp @@ -39,7 +39,7 @@ * @brief Implement extension code here. */ -moodycamel::ReaderWriterQueue g_ConnectQueue; +moodycamel::ReaderWriterQueue g_ConnectQueue; moodycamel::ReaderWriterQueue g_ErrorQueue; moodycamel::ReaderWriterQueue g_DataQueue; @@ -49,6 +49,8 @@ uv_thread_t g_UV_LoopThread; uv_async_t g_UV_AsyncAdded; moodycamel::ReaderWriterQueue g_AsyncAddQueue; +int g_MallocHandles = 0; +bool g_Running; AsyncSocket g_AsyncSocket; /**< Global singleton for extension's main interface */ SMEXT_LINK(&g_AsyncSocket); @@ -59,58 +61,92 @@ CAsyncSocketContext *AsyncSocket::GetSocketInstanceByHandle(Handle_t handle) sec.pOwner = NULL; sec.pIdentity = myself->GetIdentity(); - CAsyncSocketContext *pContext; + CAsyncSocketContext *pSocketContext; - if(handlesys->ReadHandle(handle, socketHandleType, &sec, (void **)&pContext) != HandleError_None) + if(handlesys->ReadHandle(handle, socketHandleType, &sec, (void **)&pSocketContext) != HandleError_None) return NULL; - return pContext; + return pSocketContext; } void AsyncSocket::OnHandleDestroy(HandleType_t type, void *object) { if(object != NULL) { - CAsyncSocketContext *pContext = (CAsyncSocketContext *)object; - delete pContext; + CAsyncSocketContext *pSocketContext = (CAsyncSocketContext *)object; + pSocketContext->m_Deleted = true; + + if(g_Running && (pSocketContext->m_pSocket || pSocketContext->m_pStream || pSocketContext->m_PendingCallback)) + { + printf("Delete: Async\n"); + CAsyncAddJob Job; + Job.CallbackFn = UV_DeleteAsyncContext; + Job.pData = pSocketContext; + g_AsyncAddQueue.enqueue(Job); + + uv_async_send(&g_UV_AsyncAdded); + } + else + { + printf("Delete: Now\n"); + delete pSocketContext; + } } } void OnGameFrame(bool simulating) { - CAsyncSocketContext *pContext; - while(g_ConnectQueue.try_dequeue(pContext)) + CSocketConnect *pConnect; + while(g_ConnectQueue.try_dequeue(pConnect)) { - pContext->Connected(); - } + printf("g_ConnectQueue got: "); + if(pConnect->pSocketContext->m_Server) + { + printf("server\n"); + CAsyncSocketContext *pSocketContext = new CAsyncSocketContext(pConnect->pSocketContext->m_pContext); + pSocketContext->m_Handle = handlesys->CreateHandle(g_AsyncSocket.socketHandleType, pSocketContext, + pConnect->pSocketContext->m_pContext->GetIdentity(), myself->GetIdentity(), NULL); - CSocketError *pError; - while(g_ErrorQueue.try_dequeue(pError)) - { - pError->pAsyncContext->OnError(pError->Error); + pSocketContext->m_pStream = pConnect->pClientSocket; + pSocketContext->m_pStream->data = pSocketContext; - free(pError); + pConnect->pSocketContext->OnConnect(pSocketContext); + + if(!pSocketContext->m_Deleted) + { + printf("add UV_StartRead\n"); + CAsyncAddJob Job; + Job.CallbackFn = UV_StartRead; + Job.pData = pSocketContext; + g_AsyncAddQueue.enqueue(Job); + + uv_async_send(&g_UV_AsyncAdded); + } + } + else + { + printf("client\n"); + pConnect->pSocketContext->Connected(); + } + + free(pConnect); } CSocketData *pData; while(g_DataQueue.try_dequeue(pData)) { - pData->pAsyncContext->OnData(pData->pBuffer, pData->BufferSize); + pData->pSocketContext->OnData(pData->pBuffer, pData->BufferSize); free(pData->pBuffer); free(pData); } -} -void UV_OnAsyncAdded(uv_async_t *pHandle) -{ - CAsyncAddJob Job; - while(g_AsyncAddQueue.try_dequeue(Job)) + CSocketError *pError; + while(g_ErrorQueue.try_dequeue(pError)) { - uv_async_t *pAsync = (uv_async_t *)malloc(sizeof(uv_async_t)); - uv_async_init(g_UV_Loop, pAsync, Job.CallbackFn); - pAsync->data = Job.pData; - uv_async_send(pAsync); + pError->pSocketContext->OnError(pError->Error); + + free(pError); } } @@ -120,22 +156,69 @@ void UV_EventLoop(void *data) uv_run(g_UV_Loop, UV_RUN_DEFAULT); } +void UV_OnAsyncAdded(uv_async_t *pHandle) +{ + CAsyncAddJob Job; + while(g_AsyncAddQueue.try_dequeue(Job)) + { + printf("dequeing job\n"); + uv_async_t *pAsync = (uv_async_t *)malloc(sizeof(uv_async_t)); + g_MallocHandles++; + uv_async_init(g_UV_Loop, pAsync, Job.CallbackFn); + pAsync->data = Job.pData; + pAsync->close_cb = UV_FreeHandle; + uv_async_send(pAsync); + } +} + +void UV_FreeHandle(uv_handle_t *handle) +{ + printf("freeing %p\n", handle); + free(handle); + g_MallocHandles--; +} + void UV_AllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { buf->base = (char *)malloc(suggested_size); buf->len = suggested_size; } -void UV_HandleCleanup(uv_handle_t *handle) +void UV_Quit(uv_async_t *pHandle) { - free(handle); + printf("quithandle: %p\n", pHandle); + uv_close((uv_handle_t *)pHandle, pHandle->close_cb); + + uv_close((uv_handle_t *)&g_UV_AsyncAdded, NULL); + + uv_stop(g_UV_Loop); } -void UV_PushError(CAsyncSocketContext *pContext, int error) +void UV_DeleteAsyncContext(uv_async_t *pHandle) { + CAsyncSocketContext *pSocketContext = (CAsyncSocketContext *)pHandle->data; + uv_close((uv_handle_t *)pHandle, pHandle->close_cb); + if(pSocketContext->m_pStream) + { + uv_close((uv_handle_t *)pSocketContext->m_pStream, pSocketContext->m_pStream->close_cb); + pSocketContext->m_pStream = NULL; + pSocketContext->m_pSocket = NULL; + } + if(pSocketContext->m_pSocket) + { + uv_close((uv_handle_t *)pSocketContext->m_pSocket, pSocketContext->m_pSocket->close_cb); + pSocketContext->m_pSocket = NULL; + } + + delete pSocketContext; +} + +void UV_PushError(CAsyncSocketContext *pSocketContext, int error) +{ + pSocketContext->m_PendingCallback = true; CSocketError *pError = (CSocketError *)malloc(sizeof(CSocketError)); - pError->pAsyncContext = pContext; + pError->pSocketContext = pSocketContext; pError->Error = error; g_ErrorQueue.enqueue(pError); @@ -143,86 +226,204 @@ void UV_PushError(CAsyncSocketContext *pContext, int error) void UV_OnRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { - CAsyncSocketContext *pContext = (CAsyncSocketContext *)client->data; - if(nread < 0) + printf("UV_OnRead()\n"); + CAsyncSocketContext *pSocketContext = (CAsyncSocketContext *)client->data; + if(pSocketContext->m_Deleted) { - // Connection closed - uv_close((uv_handle_t *)client, NULL); - pContext->socket = NULL; - - UV_PushError((CAsyncSocketContext *)client->data, nread); + printf("m_Deleted\n"); + free(buf->base); + uv_close((uv_handle_t *)client, client->close_cb); + pSocketContext->m_pStream = NULL; + pSocketContext->m_pSocket = NULL; return; } + if(nread < 0) + { + printf("nread < 0\n"); + // Connection closed + free(buf->base); + // But let the client disconnect. + //uv_close((uv_handle_t *)client, client->close_cb); + //pSocketContext->m_pStream = NULL; + //pSocketContext->m_pSocket = NULL; + + UV_PushError(pSocketContext, nread); + return; + } + pSocketContext->m_PendingCallback = true; + + printf("nread = %d\n", nread); + char *data = (char *)malloc(sizeof(char) * (nread + 1)); data[nread] = 0; strncpy(data, buf->base, nread); + free(buf->base); CSocketData *pData = (CSocketData *)malloc(sizeof(CSocketData)); - pData->pAsyncContext = pContext; + pData->pSocketContext = pSocketContext; pData->pBuffer = data; pData->BufferSize = nread; g_DataQueue.enqueue(pData); - - free(buf->base); } void UV_OnConnect(uv_connect_t *req, int status) { - CAsyncSocketContext *pContext = (CAsyncSocketContext *)req->data; - - if(status < 0) + CAsyncSocketContext *pSocketContext = (CAsyncSocketContext *)req->data; + if(pSocketContext->m_Deleted) { - UV_PushError(pContext, status); + free(req); + uv_close((uv_handle_t *)req->handle, req->handle->close_cb); return; } - pContext->stream = req->handle; + if(status < 0) + { + free(req); + UV_PushError(pSocketContext, status); + return; + } + pSocketContext->m_PendingCallback = true; - g_ConnectQueue.enqueue(pContext); - - req->handle->data = req->data; + pSocketContext->m_pStream = req->handle; free(req); + pSocketContext->m_pStream->data = pSocketContext; - uv_read_start(pContext->stream, UV_AllocBuffer, UV_OnRead); + CSocketConnect *pConnect = (CSocketConnect *)malloc(sizeof(CSocketConnect)); + pConnect->pSocketContext = pSocketContext; + pConnect->pClientSocket = pSocketContext->m_pStream; + g_ConnectQueue.enqueue(pConnect); + + uv_read_start(pSocketContext->m_pStream, UV_AllocBuffer, UV_OnRead); +} + +void UV_StartRead(uv_async_t *pHandle) +{ + printf("UV_StartRead()\n"); + CAsyncSocketContext *pSocketContext = (CAsyncSocketContext *)pHandle->data; + uv_close((uv_handle_t *)pHandle, pHandle->close_cb); + + if(pSocketContext->m_Deleted || !pSocketContext->m_pStream) + { + printf("UV_StartRead() -> error\n"); + return; + } + + int err = uv_read_start(pSocketContext->m_pStream, UV_AllocBuffer, UV_OnRead); + printf("~UV_StartRead() = %d %s\n", err, uv_err_name(err)); +} + +void UV_OnNewConnection(uv_stream_t *server, int status) +{ + printf("UV_OnNewConnection()\n"); + // server context + CAsyncSocketContext *pSocketContext = (CAsyncSocketContext *)server->data; + if(pSocketContext->m_Deleted) + { + printf("m_Deleted()\n"); + uv_close((uv_handle_t *)server, server->close_cb); + return; + } + + if(status < 0) + { + printf("status < 0\n"); + uv_close((uv_handle_t *)server, server->close_cb); + //uv_close((uv_handle_t *)pSocketContext->m_pSocket, pSocketContext->m_pSocket->close_cb); + UV_PushError(pSocketContext, status); + return; + } + + uv_tcp_t *pClientSocket = (uv_tcp_t *)malloc(sizeof(uv_tcp_t)); + g_MallocHandles++; + uv_tcp_init(g_UV_Loop, pClientSocket); + pClientSocket->close_cb = UV_FreeHandle; + + if(uv_accept((uv_stream_t *)pSocketContext->m_pSocket, (uv_stream_t *)pClientSocket) == 0) + { + printf("accepted\n"); + pSocketContext->m_PendingCallback = true; + CSocketConnect *pConnect = (CSocketConnect *)malloc(sizeof(CSocketConnect)); + pConnect->pSocketContext = pSocketContext; + pConnect->pClientSocket = (uv_stream_t *)pClientSocket; + g_ConnectQueue.enqueue(pConnect); + } + else + { + printf("accept failed\n"); + uv_close((uv_handle_t *)pClientSocket, pClientSocket->close_cb); + } } void UV_OnAsyncResolved(uv_getaddrinfo_t *resolver, int status, struct addrinfo *res) { - free(resolver->service); - CAsyncSocketContext *pContext = (CAsyncSocketContext *) resolver->data; + if(resolver->service != NULL) + free(resolver->service); - if(status < 0) + CAsyncSocketContext *pSocketContext = (CAsyncSocketContext *)resolver->data; + if(pSocketContext->m_Deleted || pSocketContext->m_pSocket) { - UV_PushError(pContext, status); + uv_freeaddrinfo(res); return; } - uv_connect_t *connect_req = (uv_connect_t *)malloc(sizeof(uv_connect_t)); - uv_tcp_t *socket = (uv_tcp_t *)malloc(sizeof(uv_tcp_t)); + if(status < 0) + { + pSocketContext->m_Pending = false; + uv_freeaddrinfo(res); + UV_PushError(pSocketContext, status); + return; + } - pContext->socket = socket; - connect_req->data = pContext; + uv_connect_t *pConnectReq = (uv_connect_t *)malloc(sizeof(uv_connect_t)); + pConnectReq->data = pSocketContext; - char addr[32] = {0}; - uv_ip4_name((struct sockaddr_in *)res->ai_addr, addr, sizeof(addr)); + uv_tcp_t *pSocket = (uv_tcp_t *)malloc(sizeof(uv_tcp_t)); + g_MallocHandles++; + uv_tcp_init(g_UV_Loop, pSocket); + pSocket->close_cb = UV_FreeHandle; + pSocket->data = pSocketContext; - uv_tcp_init(g_UV_Loop, socket); - uv_tcp_connect(connect_req, socket, (const struct sockaddr*) res->ai_addr, UV_OnConnect); + pSocketContext->m_pSocket = pSocket; + pSocketContext->m_Pending = false; + + if(pSocketContext->m_Server) + { + printf("new server\n"); + uv_tcp_bind(pSocket, (const struct sockaddr *)res->ai_addr, 0); + + int err = uv_listen((uv_stream_t *)pSocket, 32, UV_OnNewConnection); + if(err) + { + printf("yes, new server\n"); + uv_freeaddrinfo(res); + uv_close((uv_handle_t *)pSocket, pSocket->close_cb); + UV_PushError(pSocketContext, err); + return; + } + } + else + { + printf("connect client\n"); + uv_tcp_connect(pConnectReq, pSocket, (const struct sockaddr *)res->ai_addr, UV_OnConnect); + } uv_freeaddrinfo(res); } -void UV_OnAsyncResolve(uv_async_t *handle) +void UV_OnAsyncResolve(uv_async_t *pHandle) { - CAsyncSocketContext *pAsyncContext = (CAsyncSocketContext *)handle->data; - uv_close((uv_handle_t *)handle, UV_HandleCleanup); + CAsyncSocketContext *pSocketContext = (CAsyncSocketContext *)pHandle->data; + uv_close((uv_handle_t *)pHandle, pHandle->close_cb); - pAsyncContext->resolver.data = pAsyncContext; + if(pSocketContext->m_Deleted || pSocketContext->m_pSocket) + return; + + pSocketContext->m_Resolver.data = pSocketContext; char *service = (char *)malloc(8); - sprintf(service, "%d", pAsyncContext->m_Port); + sprintf(service, "%d", pSocketContext->m_Port); struct addrinfo hints; hints.ai_family = PF_INET; @@ -230,9 +431,13 @@ void UV_OnAsyncResolve(uv_async_t *handle) hints.ai_protocol = IPPROTO_TCP; hints.ai_flags = 0; - int err = uv_getaddrinfo(g_UV_Loop, &pAsyncContext->resolver, UV_OnAsyncResolved, pAsyncContext->m_pHost, service, &hints); + int err = uv_getaddrinfo(g_UV_Loop, &pSocketContext->m_Resolver, UV_OnAsyncResolved, pSocketContext->m_pHost, service, &hints); if(err) - UV_PushError(pAsyncContext, err); + { + if(service != NULL) + free(service); + UV_PushError(pSocketContext, err); + } } void UV_OnAsyncWriteCleanup(uv_write_t *req, int status) @@ -247,13 +452,14 @@ void UV_OnAsyncWriteCleanup(uv_write_t *req, int status) void UV_OnAsyncWrite(uv_async_t *handle) { + printf("Write 1\n"); CAsyncWrite *pWrite = (CAsyncWrite *)handle->data; - uv_close((uv_handle_t *)handle, UV_HandleCleanup); + uv_close((uv_handle_t *)handle, handle->close_cb); if(pWrite == NULL || pWrite->pBuffer == NULL) return; - if(pWrite->pAsyncContext == NULL || pWrite->pAsyncContext->stream == NULL) + if(pWrite->pSocketContext == NULL || pWrite->pSocketContext->m_pStream == NULL) { free(pWrite->pBuffer->base); free(pWrite->pBuffer); @@ -264,38 +470,81 @@ void UV_OnAsyncWrite(uv_async_t *handle) uv_write_t *req = (uv_write_t *)malloc(sizeof(uv_write_t)); req->data = pWrite; - uv_write(req, pWrite->pAsyncContext->stream, pWrite->pBuffer, 1, UV_OnAsyncWriteCleanup); + printf("Write 2\n"); + uv_write(req, pWrite->pSocketContext->m_pStream, pWrite->pBuffer, 1, UV_OnAsyncWriteCleanup); } cell_t Native_AsyncSocket_Create(IPluginContext *pContext, const cell_t *params) { - CAsyncSocketContext *pAsyncContext = new CAsyncSocketContext(pContext); + CAsyncSocketContext *pSocketContext = new CAsyncSocketContext(pContext); - pAsyncContext->m_Handle = handlesys->CreateHandle(g_AsyncSocket.socketHandleType, pAsyncContext, + pSocketContext->m_Handle = handlesys->CreateHandle(g_AsyncSocket.socketHandleType, pSocketContext, pContext->GetIdentity(), myself->GetIdentity(), NULL); - return pAsyncContext->m_Handle; + return pSocketContext->m_Handle; } cell_t Native_AsyncSocket_Connect(IPluginContext *pContext, const cell_t *params) { - CAsyncSocketContext *pAsyncContext = g_AsyncSocket.GetSocketInstanceByHandle(params[1]); + CAsyncSocketContext *pSocketContext = g_AsyncSocket.GetSocketInstanceByHandle(params[1]); - if(pAsyncContext == NULL) + if(pSocketContext == NULL) return pContext->ThrowNativeError("Invalid socket handle"); if(params[3] < 0 || params[3] > 65535) return pContext->ThrowNativeError("Invalid port specified"); + if(pSocketContext->m_pSocket) + return pContext->ThrowNativeError("Socket is already connected"); + + if(pSocketContext->m_Pending) + return pContext->ThrowNativeError("Socket is currently pending"); + char *address = NULL; pContext->LocalToString(params[2], &address); - pAsyncContext->m_pHost = address; - pAsyncContext->m_Port = params[3]; + pSocketContext->m_pHost = strdup(address); + pSocketContext->m_Port = params[3]; + pSocketContext->m_Server = false; + pSocketContext->m_Pending = true; CAsyncAddJob Job; Job.CallbackFn = UV_OnAsyncResolve; - Job.pData = pAsyncContext; + Job.pData = pSocketContext; + g_AsyncAddQueue.enqueue(Job); + + uv_async_send(&g_UV_AsyncAdded); + + return 1; +} + +cell_t Native_AsyncSocket_Listen(IPluginContext *pContext, const cell_t *params) +{ + CAsyncSocketContext *pSocketContext = g_AsyncSocket.GetSocketInstanceByHandle(params[1]); + + if(pSocketContext == NULL) + return pContext->ThrowNativeError("Invalid socket handle"); + + if(params[3] < 0 || params[3] > 65535) + return pContext->ThrowNativeError("Invalid port specified"); + + if(pSocketContext->m_pSocket) + return pContext->ThrowNativeError("Socket is already connected"); + + if(pSocketContext->m_Pending) + return pContext->ThrowNativeError("Socket is currently pending"); + + char *address = NULL; + pContext->LocalToString(params[2], &address); + + pSocketContext->m_pHost = strdup(address); + pSocketContext->m_Port = params[3]; + pSocketContext->m_Server = true; + pSocketContext->m_Pending = true; + + CAsyncAddJob Job; + Job.CallbackFn = UV_OnAsyncResolve; + Job.pData = pSocketContext; g_AsyncAddQueue.enqueue(Job); uv_async_send(&g_UV_AsyncAdded); @@ -305,11 +554,14 @@ cell_t Native_AsyncSocket_Connect(IPluginContext *pContext, const cell_t *params cell_t Native_AsyncSocket_Write(IPluginContext *pContext, const cell_t *params) { - CAsyncSocketContext *pAsyncContext = g_AsyncSocket.GetSocketInstanceByHandle(params[1]); + CAsyncSocketContext *pSocketContext = g_AsyncSocket.GetSocketInstanceByHandle(params[1]); - if(pAsyncContext == NULL) + if(pSocketContext == NULL) return pContext->ThrowNativeError("Invalid socket handle"); + if(!pSocketContext->m_pStream) + return pContext->ThrowNativeError("Socket is not connected"); + char *data = NULL; pContext->LocalToString(params[2], &data); @@ -320,7 +572,7 @@ cell_t Native_AsyncSocket_Write(IPluginContext *pContext, const cell_t *params) CAsyncWrite *pWrite = (CAsyncWrite *)malloc(sizeof(CAsyncWrite)); - pWrite->pAsyncContext = pAsyncContext; + pWrite->pSocketContext = pSocketContext; pWrite->pBuffer = buffer; CAsyncAddJob Job; @@ -335,12 +587,12 @@ cell_t Native_AsyncSocket_Write(IPluginContext *pContext, const cell_t *params) cell_t Native_AsyncSocket_SetConnectCallback(IPluginContext *pContext, const cell_t *params) { - CAsyncSocketContext *pAsyncContext = g_AsyncSocket.GetSocketInstanceByHandle(params[1]); + CAsyncSocketContext *pSocketContext = g_AsyncSocket.GetSocketInstanceByHandle(params[1]); - if(pAsyncContext == NULL) + if(pSocketContext == NULL) return pContext->ThrowNativeError("Invalid socket handle"); - if(!pAsyncContext->SetConnectCallback(params[2])) + if(!pSocketContext->SetConnectCallback(params[2])) return pContext->ThrowNativeError("Invalid callback"); return true; @@ -348,12 +600,12 @@ cell_t Native_AsyncSocket_SetConnectCallback(IPluginContext *pContext, const cel cell_t Native_AsyncSocket_SetErrorCallback(IPluginContext *pContext, const cell_t *params) { - CAsyncSocketContext *pAsyncContext = g_AsyncSocket.GetSocketInstanceByHandle(params[1]); + CAsyncSocketContext *pSocketContext = g_AsyncSocket.GetSocketInstanceByHandle(params[1]); - if(pAsyncContext == NULL) + if(pSocketContext == NULL) return pContext->ThrowNativeError("Invalid socket handle"); - if(!pAsyncContext->SetErrorCallback(params[2])) + if(!pSocketContext->SetErrorCallback(params[2])) return pContext->ThrowNativeError("Invalid callback"); return true; @@ -361,12 +613,12 @@ cell_t Native_AsyncSocket_SetErrorCallback(IPluginContext *pContext, const cell_ cell_t Native_AsyncSocket_SetDataCallback(IPluginContext *pContext, const cell_t *params) { - CAsyncSocketContext *pAsyncContext = g_AsyncSocket.GetSocketInstanceByHandle(params[1]); + CAsyncSocketContext *pSocketContext = g_AsyncSocket.GetSocketInstanceByHandle(params[1]); - if(pAsyncContext == NULL) + if(pSocketContext == NULL) return pContext->ThrowNativeError("Invalid socket handle"); - if(!pAsyncContext->SetDataCallback(params[2])) + if(!pSocketContext->SetDataCallback(params[2])) return pContext->ThrowNativeError("Invalid callback"); return true; @@ -375,8 +627,10 @@ cell_t Native_AsyncSocket_SetDataCallback(IPluginContext *pContext, const cell_t // Sourcemod Plugin Events bool AsyncSocket::SDK_OnLoad(char *error, size_t maxlength, bool late) { + g_Running = true; + g_MallocHandles = 0; sharesys->AddNatives(myself, AsyncSocketNatives); - sharesys->RegisterLibrary(myself, "async_socket"); + sharesys->RegisterLibrary(myself, "AsyncSocket"); socketHandleType = handlesys->CreateType("AsyncSocket", this, 0, NULL, NULL, myself->GetIdentity(), NULL); @@ -385,28 +639,46 @@ bool AsyncSocket::SDK_OnLoad(char *error, size_t maxlength, bool late) g_UV_Loop = uv_default_loop(); uv_async_init(g_UV_Loop, &g_UV_AsyncAdded, UV_OnAsyncAdded); + g_UV_AsyncAdded.close_cb = NULL; uv_thread_create(&g_UV_LoopThread, UV_EventLoop, NULL); return true; } +void UV_OnWalk(uv_handle_t *pHandle, void *pArg) +{ + uv_close(pHandle, pHandle->close_cb); +} + void AsyncSocket::SDK_OnUnload() { - handlesys->RemoveType(socketHandleType, NULL); + g_Running = false; + handlesys->RemoveType(socketHandleType, myself->GetIdentity()); - uv_close((uv_handle_t *)&g_UV_AsyncAdded, NULL); + CAsyncAddJob Job; + Job.CallbackFn = UV_Quit; + Job.pData = NULL; + g_AsyncAddQueue.enqueue(Job); + + uv_async_send(&g_UV_AsyncAdded); uv_thread_join(&g_UV_LoopThread); + uv_walk(g_UV_Loop, UV_OnWalk, NULL); + + uv_run(g_UV_Loop, UV_RUN_DEFAULT); + uv_loop_close(g_UV_Loop); smutils->RemoveGameFrameHook(OnGameFrame); + printf("g_MallocHandles = %d\n", g_MallocHandles); } const sp_nativeinfo_t AsyncSocketNatives[] = { {"AsyncSocket.AsyncSocket", Native_AsyncSocket_Create}, {"AsyncSocket.Connect", Native_AsyncSocket_Connect}, + {"AsyncSocket.Listen", Native_AsyncSocket_Listen}, {"AsyncSocket.Write", Native_AsyncSocket_Write}, {"AsyncSocket.SetConnectCallback", Native_AsyncSocket_SetConnectCallback}, {"AsyncSocket.SetErrorCallback", Native_AsyncSocket_SetErrorCallback}, diff --git a/extension/extension.h b/extension/extension.h index afa68d9..237d64e 100644 --- a/extension/extension.h +++ b/extension/extension.h @@ -50,23 +50,47 @@ struct CAsyncAddJob struct CAsyncWrite { - CAsyncSocketContext *pAsyncContext; + CAsyncSocketContext *pSocketContext; uv_buf_t *pBuffer; }; +struct CSocketConnect +{ + CAsyncSocketContext *pSocketContext; + uv_stream_t *pClientSocket; +}; + struct CSocketData { - CAsyncSocketContext *pAsyncContext; + CAsyncSocketContext *pSocketContext; char *pBuffer; ssize_t BufferSize; }; struct CSocketError { - CAsyncSocketContext *pAsyncContext; + CAsyncSocketContext *pSocketContext; int Error; }; +void UV_EventLoop(void *data); +void UV_OnAsyncAdded(uv_async_t *pHandle); +void UV_FreeHandle(uv_handle_t *handle); +void UV_AllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf); +void UV_Quit(uv_async_t *pHandle); +void UV_DeleteAsyncContext(uv_async_t *pHandle); +void UV_PushError(CAsyncSocketContext *pContext, int error); +void UV_OnRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf); +void UV_OnConnect(uv_connect_t *req, int status); +void UV_StartRead(uv_async_t *pHandle); +void UV_OnNewConnection(uv_stream_t *server, int status); +void UV_OnAsyncResolved(uv_getaddrinfo_t *resolver, int status, struct addrinfo *res); +void UV_OnAsyncResolve(uv_async_t *handle); +void UV_OnAsyncWriteCleanup(uv_write_t *req, int status); +void UV_OnAsyncWrite(uv_async_t *handle); +void UV_OnWalk(uv_handle_t *pHandle, void *pArg); + + /** * @brief Sample implementation of the SDK Extension. * Note: Uncomment one of the pre-defined virtual functions in order to use it. diff --git a/extension/smsdk_config.h b/extension/smsdk_config.h index 12bd617..4ebd850 100644 --- a/extension/smsdk_config.h +++ b/extension/smsdk_config.h @@ -39,11 +39,11 @@ /* Basic information exposed publicly */ #define SMEXT_CONF_NAME "Async Socket Extension" -#define SMEXT_CONF_DESCRIPTION "Sample extension to help developers" -#define SMEXT_CONF_VERSION "1.0" -#define SMEXT_CONF_AUTHOR "Nikki" -#define SMEXT_CONF_URL "http://probablyaserver.com/" -#define SMEXT_CONF_LOGTAG "ASYNC" +#define SMEXT_CONF_DESCRIPTION "Async TCP server and client" +#define SMEXT_CONF_VERSION "0.1" +#define SMEXT_CONF_AUTHOR "Nikki + BotoX" +#define SMEXT_CONF_URL "https://github.com/CSSZombieEscape/sm-ext-AsyncSocket" +#define SMEXT_CONF_LOGTAG "ASYNCSOCKET" #define SMEXT_CONF_LICENSE "GPL" #define SMEXT_CONF_DATESTRING __DATE__ @@ -56,7 +56,7 @@ * @brief Sets whether or not this plugin required Metamod. * NOTE: Uncomment to enable, comment to disable. */ -//#define SMEXT_CONF_METAMOD +//#define SMEXT_CONF_METAMOD /** Enable interfaces you want to use here by uncommenting lines */ #define SMEXT_ENABLE_FORWARDSYS diff --git a/pawn/async_socket.inc b/pawn/AsyncSocket.inc similarity index 57% rename from pawn/async_socket.inc rename to pawn/AsyncSocket.inc index 53679e4..b99ab76 100644 --- a/pawn/async_socket.inc +++ b/pawn/AsyncSocket.inc @@ -1,10 +1,7 @@ -/* -** -*/ -#if defined _async_socket_included +#if defined _AsyncSocket_included #endinput #endif -#define _async_socket_included +#define _AsyncSocket_included typedef AsyncSocketConnectCallback = function void(AsyncSocket socket); @@ -14,31 +11,27 @@ typedef AsyncSocketDataCallback = function void(AsyncSocket socket, const char[] methodmap AsyncSocket < Handle { public native AsyncSocket(); - + public native bool Connect(const char[] host, const int port); - + + public native bool Listen(const char[] host, const int port); + public native bool Write(const char[] data); - + public native bool SetConnectCallback(AsyncSocketConnectCallback callback); - + public native bool SetErrorCallback(AsyncSocketErrorCallback callback); - + public native bool SetDataCallback(AsyncSocketDataCallback callback); } -/* -#if !defined REQUIRE_EXTENSIONS -public __ext_INTERFACE_SetNTVOptional() +/** + * Do not edit below this line! + */ +public Extension __ext_AsyncSocket = { - MarkNativeAsOptional(""); -} -#endif -*/ - -public Extension __ext_INTERFACE = -{ - name = "async_socket", - file = "async_socket.ext", + name = "AsyncSocket", + file = "AsyncSocket.ext", #if defined AUTOLOAD_EXTENSIONS autoload = 1, #else @@ -50,3 +43,16 @@ public Extension __ext_INTERFACE = required = 0, #endif }; + +#if !defined REQUIRE_EXTENSIONS +public __ext_AsyncSocket_SetNTVOptional() +{ + MarkNativeAsOptional("AsyncSocket.AsyncSocket"); + MarkNativeAsOptional("AsyncSocket.Connect"); + MarkNativeAsOptional("AsyncSocket.Listen"); + MarkNativeAsOptional("AsyncSocket.Write"); + MarkNativeAsOptional("AsyncSocket.SetConnectCallback"); + MarkNativeAsOptional("AsyncSocket.SetErrorCallback"); + MarkNativeAsOptional("AsyncSocket.SetDataCallback"); +} +#endif