/** * vim: set ts=4 : * ============================================================================= * SourceMod Sample Extension * Copyright (C) 2004-2008 AlliedModders LLC. All rights reserved. * ============================================================================= * * This program is free software; you can redistribute it and/or modify it under * the terms of the GNU General Public License, version 3.0, as published by the * Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more * details. * * You should have received a copy of the GNU General Public License along with * this program. If not, see . * * As a special exception, AlliedModders LLC gives you permission to link the * code of this program (as well as its derivative works) to "Half-Life 2," the * "Source Engine," the "SourcePawn JIT," and any Game MODs that run on software * by the Valve Corporation. You must obey the GNU General Public License in * all respects for all other code used. Additionally, AlliedModders LLC grants * this exception to all derivative works. AlliedModders LLC defines further * exceptions, found in LICENSE.txt (as of this writing, version JULY-31-2007), * or . * * Version: $Id$ */ #include "extension.h" #include "context.h" #include "readerwriterqueue.h" #include /** * @file extension.cpp * @brief Implement extension code here. */ moodycamel::ReaderWriterQueue g_ConnectQueue; moodycamel::ReaderWriterQueue g_ErrorQueue; moodycamel::ReaderWriterQueue g_DataQueue; uv_loop_t *g_UV_Loop; uv_thread_t g_UV_LoopThread; uv_async_t g_UV_AsyncAdded; moodycamel::ReaderWriterQueue g_AsyncAddQueue; int g_MallocHandles = 0; bool g_Running; AsyncSocket g_AsyncSocket; /**< Global singleton for extension's main interface */ SMEXT_LINK(&g_AsyncSocket); CAsyncSocketContext *AsyncSocket::GetSocketInstanceByHandle(Handle_t handle) { HandleSecurity sec; sec.pOwner = NULL; sec.pIdentity = myself->GetIdentity(); CAsyncSocketContext *pSocketContext; if(handlesys->ReadHandle(handle, socketHandleType, &sec, (void **)&pSocketContext) != HandleError_None) return NULL; return pSocketContext; } void AsyncSocket::OnHandleDestroy(HandleType_t type, void *object) { if(object != NULL) { CAsyncSocketContext *pSocketContext = (CAsyncSocketContext *)object; pSocketContext->m_Deleted = true; if(g_Running && (pSocketContext->m_pSocket || pSocketContext->m_pStream || pSocketContext->m_PendingCallback)) { printf("Delete: Async\n"); CAsyncAddJob Job; Job.CallbackFn = UV_DeleteAsyncContext; Job.pData = pSocketContext; g_AsyncAddQueue.enqueue(Job); uv_async_send(&g_UV_AsyncAdded); } else { printf("Delete: Now\n"); delete pSocketContext; } } } void OnGameFrame(bool simulating) { CSocketConnect *pConnect; while(g_ConnectQueue.try_dequeue(pConnect)) { printf("g_ConnectQueue got: "); if(pConnect->pSocketContext->m_Server) { printf("server\n"); CAsyncSocketContext *pSocketContext = new CAsyncSocketContext(pConnect->pSocketContext->m_pContext); pSocketContext->m_Handle = handlesys->CreateHandle(g_AsyncSocket.socketHandleType, pSocketContext, pConnect->pSocketContext->m_pContext->GetIdentity(), myself->GetIdentity(), NULL); pSocketContext->m_pStream = pConnect->pClientSocket; pSocketContext->m_pStream->data = pSocketContext; pConnect->pSocketContext->OnConnect(pSocketContext); if(!pSocketContext->m_Deleted) { printf("add UV_StartRead\n"); CAsyncAddJob Job; Job.CallbackFn = UV_StartRead; Job.pData = pSocketContext; g_AsyncAddQueue.enqueue(Job); uv_async_send(&g_UV_AsyncAdded); } } else { printf("client\n"); pConnect->pSocketContext->Connected(); } free(pConnect); } CSocketData *pData; while(g_DataQueue.try_dequeue(pData)) { pData->pSocketContext->OnData(pData->pBuffer, pData->BufferSize); free(pData->pBuffer); free(pData); } CSocketError *pError; while(g_ErrorQueue.try_dequeue(pError)) { pError->pSocketContext->OnError(pError->Error); free(pError); } } // main event loop thread void UV_EventLoop(void *data) { uv_run(g_UV_Loop, UV_RUN_DEFAULT); } void UV_OnAsyncAdded(uv_async_t *pHandle) { CAsyncAddJob Job; while(g_AsyncAddQueue.try_dequeue(Job)) { printf("dequeing job\n"); uv_async_t *pAsync = (uv_async_t *)malloc(sizeof(uv_async_t)); g_MallocHandles++; uv_async_init(g_UV_Loop, pAsync, Job.CallbackFn); pAsync->data = Job.pData; pAsync->close_cb = UV_FreeHandle; uv_async_send(pAsync); } } void UV_FreeHandle(uv_handle_t *handle) { printf("freeing %p\n", handle); free(handle); g_MallocHandles--; } void UV_AllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { buf->base = (char *)malloc(suggested_size); buf->len = suggested_size; } void UV_Quit(uv_async_t *pHandle) { printf("quithandle: %p\n", pHandle); uv_close((uv_handle_t *)pHandle, pHandle->close_cb); uv_close((uv_handle_t *)&g_UV_AsyncAdded, NULL); uv_stop(g_UV_Loop); } void UV_DeleteAsyncContext(uv_async_t *pHandle) { CAsyncSocketContext *pSocketContext = (CAsyncSocketContext *)pHandle->data; uv_close((uv_handle_t *)pHandle, pHandle->close_cb); if(pSocketContext->m_pStream) { uv_close((uv_handle_t *)pSocketContext->m_pStream, pSocketContext->m_pStream->close_cb); pSocketContext->m_pStream = NULL; pSocketContext->m_pSocket = NULL; } if(pSocketContext->m_pSocket) { uv_close((uv_handle_t *)pSocketContext->m_pSocket, pSocketContext->m_pSocket->close_cb); pSocketContext->m_pSocket = NULL; } delete pSocketContext; } void UV_PushError(CAsyncSocketContext *pSocketContext, int error) { pSocketContext->m_PendingCallback = true; CSocketError *pError = (CSocketError *)malloc(sizeof(CSocketError)); pError->pSocketContext = pSocketContext; pError->Error = error; g_ErrorQueue.enqueue(pError); } void UV_OnRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { printf("UV_OnRead()\n"); CAsyncSocketContext *pSocketContext = (CAsyncSocketContext *)client->data; if(pSocketContext->m_Deleted) { printf("m_Deleted\n"); free(buf->base); uv_close((uv_handle_t *)client, client->close_cb); pSocketContext->m_pStream = NULL; pSocketContext->m_pSocket = NULL; return; } if(nread < 0) { printf("nread < 0\n"); // Connection closed free(buf->base); // But let the client disconnect. //uv_close((uv_handle_t *)client, client->close_cb); //pSocketContext->m_pStream = NULL; //pSocketContext->m_pSocket = NULL; UV_PushError(pSocketContext, nread); return; } pSocketContext->m_PendingCallback = true; printf("nread = %d\n", nread); char *data = (char *)malloc(sizeof(char) * (nread + 1)); data[nread] = 0; strncpy(data, buf->base, nread); free(buf->base); CSocketData *pData = (CSocketData *)malloc(sizeof(CSocketData)); pData->pSocketContext = pSocketContext; pData->pBuffer = data; pData->BufferSize = nread; g_DataQueue.enqueue(pData); } void UV_OnConnect(uv_connect_t *req, int status) { CAsyncSocketContext *pSocketContext = (CAsyncSocketContext *)req->data; if(pSocketContext->m_Deleted) { free(req); uv_close((uv_handle_t *)req->handle, req->handle->close_cb); return; } if(status < 0) { free(req); UV_PushError(pSocketContext, status); return; } pSocketContext->m_PendingCallback = true; pSocketContext->m_pStream = req->handle; free(req); pSocketContext->m_pStream->data = pSocketContext; CSocketConnect *pConnect = (CSocketConnect *)malloc(sizeof(CSocketConnect)); pConnect->pSocketContext = pSocketContext; pConnect->pClientSocket = pSocketContext->m_pStream; g_ConnectQueue.enqueue(pConnect); uv_read_start(pSocketContext->m_pStream, UV_AllocBuffer, UV_OnRead); } void UV_StartRead(uv_async_t *pHandle) { printf("UV_StartRead()\n"); CAsyncSocketContext *pSocketContext = (CAsyncSocketContext *)pHandle->data; uv_close((uv_handle_t *)pHandle, pHandle->close_cb); if(pSocketContext->m_Deleted || !pSocketContext->m_pStream) { printf("UV_StartRead() -> error\n"); return; } int err = uv_read_start(pSocketContext->m_pStream, UV_AllocBuffer, UV_OnRead); printf("~UV_StartRead() = %d %s\n", err, uv_err_name(err)); } void UV_OnNewConnection(uv_stream_t *server, int status) { printf("UV_OnNewConnection()\n"); // server context CAsyncSocketContext *pSocketContext = (CAsyncSocketContext *)server->data; if(pSocketContext->m_Deleted) { printf("m_Deleted()\n"); uv_close((uv_handle_t *)server, server->close_cb); return; } if(status < 0) { printf("status < 0\n"); uv_close((uv_handle_t *)server, server->close_cb); //uv_close((uv_handle_t *)pSocketContext->m_pSocket, pSocketContext->m_pSocket->close_cb); UV_PushError(pSocketContext, status); return; } uv_tcp_t *pClientSocket = (uv_tcp_t *)malloc(sizeof(uv_tcp_t)); g_MallocHandles++; uv_tcp_init(g_UV_Loop, pClientSocket); pClientSocket->close_cb = UV_FreeHandle; if(uv_accept((uv_stream_t *)pSocketContext->m_pSocket, (uv_stream_t *)pClientSocket) == 0) { printf("accepted\n"); pSocketContext->m_PendingCallback = true; CSocketConnect *pConnect = (CSocketConnect *)malloc(sizeof(CSocketConnect)); pConnect->pSocketContext = pSocketContext; pConnect->pClientSocket = (uv_stream_t *)pClientSocket; g_ConnectQueue.enqueue(pConnect); } else { printf("accept failed\n"); uv_close((uv_handle_t *)pClientSocket, pClientSocket->close_cb); } } void UV_OnAsyncResolved(uv_getaddrinfo_t *resolver, int status, struct addrinfo *res) { if(resolver->service != NULL) free(resolver->service); CAsyncSocketContext *pSocketContext = (CAsyncSocketContext *)resolver->data; if(pSocketContext->m_Deleted || pSocketContext->m_pSocket) { uv_freeaddrinfo(res); return; } if(status < 0) { pSocketContext->m_Pending = false; uv_freeaddrinfo(res); UV_PushError(pSocketContext, status); return; } uv_connect_t *pConnectReq = (uv_connect_t *)malloc(sizeof(uv_connect_t)); pConnectReq->data = pSocketContext; uv_tcp_t *pSocket = (uv_tcp_t *)malloc(sizeof(uv_tcp_t)); g_MallocHandles++; uv_tcp_init(g_UV_Loop, pSocket); pSocket->close_cb = UV_FreeHandle; pSocket->data = pSocketContext; pSocketContext->m_pSocket = pSocket; pSocketContext->m_Pending = false; if(pSocketContext->m_Server) { printf("new server\n"); uv_tcp_bind(pSocket, (const struct sockaddr *)res->ai_addr, 0); int err = uv_listen((uv_stream_t *)pSocket, 32, UV_OnNewConnection); if(err) { printf("yes, new server\n"); uv_freeaddrinfo(res); uv_close((uv_handle_t *)pSocket, pSocket->close_cb); UV_PushError(pSocketContext, err); return; } } else { printf("connect client\n"); uv_tcp_connect(pConnectReq, pSocket, (const struct sockaddr *)res->ai_addr, UV_OnConnect); } uv_freeaddrinfo(res); } void UV_OnAsyncResolve(uv_async_t *pHandle) { CAsyncSocketContext *pSocketContext = (CAsyncSocketContext *)pHandle->data; uv_close((uv_handle_t *)pHandle, pHandle->close_cb); if(pSocketContext->m_Deleted || pSocketContext->m_pSocket) return; pSocketContext->m_Resolver.data = pSocketContext; char *service = (char *)malloc(8); sprintf(service, "%d", pSocketContext->m_Port); struct addrinfo hints; hints.ai_family = PF_INET; hints.ai_socktype = SOCK_STREAM; hints.ai_protocol = IPPROTO_TCP; hints.ai_flags = 0; int err = uv_getaddrinfo(g_UV_Loop, &pSocketContext->m_Resolver, UV_OnAsyncResolved, pSocketContext->m_pHost, service, &hints); if(err) { if(service != NULL) free(service); UV_PushError(pSocketContext, err); } } void UV_OnAsyncWriteCleanup(uv_write_t *req, int status) { CAsyncWrite *pWrite = (CAsyncWrite *)req->data; free(pWrite->pBuffer->base); free(pWrite->pBuffer); free(pWrite); free(req); } void UV_OnAsyncWrite(uv_async_t *handle) { printf("Write 1\n"); CAsyncWrite *pWrite = (CAsyncWrite *)handle->data; uv_close((uv_handle_t *)handle, handle->close_cb); if(pWrite == NULL || pWrite->pBuffer == NULL) return; if(pWrite->pSocketContext == NULL || pWrite->pSocketContext->m_pStream == NULL) { free(pWrite->pBuffer->base); free(pWrite->pBuffer); free(pWrite); return; } uv_write_t *req = (uv_write_t *)malloc(sizeof(uv_write_t)); req->data = pWrite; printf("Write 2\n"); uv_write(req, pWrite->pSocketContext->m_pStream, pWrite->pBuffer, 1, UV_OnAsyncWriteCleanup); } cell_t Native_AsyncSocket_Create(IPluginContext *pContext, const cell_t *params) { CAsyncSocketContext *pSocketContext = new CAsyncSocketContext(pContext); pSocketContext->m_Handle = handlesys->CreateHandle(g_AsyncSocket.socketHandleType, pSocketContext, pContext->GetIdentity(), myself->GetIdentity(), NULL); return pSocketContext->m_Handle; } cell_t Native_AsyncSocket_Connect(IPluginContext *pContext, const cell_t *params) { CAsyncSocketContext *pSocketContext = g_AsyncSocket.GetSocketInstanceByHandle(params[1]); if(pSocketContext == NULL) return pContext->ThrowNativeError("Invalid socket handle"); if(params[3] < 0 || params[3] > 65535) return pContext->ThrowNativeError("Invalid port specified"); if(pSocketContext->m_pSocket) return pContext->ThrowNativeError("Socket is already connected"); if(pSocketContext->m_Pending) return pContext->ThrowNativeError("Socket is currently pending"); char *address = NULL; pContext->LocalToString(params[2], &address); pSocketContext->m_pHost = strdup(address); pSocketContext->m_Port = params[3]; pSocketContext->m_Server = false; pSocketContext->m_Pending = true; CAsyncAddJob Job; Job.CallbackFn = UV_OnAsyncResolve; Job.pData = pSocketContext; g_AsyncAddQueue.enqueue(Job); uv_async_send(&g_UV_AsyncAdded); return 1; } cell_t Native_AsyncSocket_Listen(IPluginContext *pContext, const cell_t *params) { CAsyncSocketContext *pSocketContext = g_AsyncSocket.GetSocketInstanceByHandle(params[1]); if(pSocketContext == NULL) return pContext->ThrowNativeError("Invalid socket handle"); if(params[3] < 0 || params[3] > 65535) return pContext->ThrowNativeError("Invalid port specified"); if(pSocketContext->m_pSocket) return pContext->ThrowNativeError("Socket is already connected"); if(pSocketContext->m_Pending) return pContext->ThrowNativeError("Socket is currently pending"); char *address = NULL; pContext->LocalToString(params[2], &address); pSocketContext->m_pHost = strdup(address); pSocketContext->m_Port = params[3]; pSocketContext->m_Server = true; pSocketContext->m_Pending = true; CAsyncAddJob Job; Job.CallbackFn = UV_OnAsyncResolve; Job.pData = pSocketContext; g_AsyncAddQueue.enqueue(Job); uv_async_send(&g_UV_AsyncAdded); return 1; } cell_t Native_AsyncSocket_Write(IPluginContext *pContext, const cell_t *params) { CAsyncSocketContext *pSocketContext = g_AsyncSocket.GetSocketInstanceByHandle(params[1]); if(pSocketContext == NULL) return pContext->ThrowNativeError("Invalid socket handle"); if(!pSocketContext->m_pStream) return pContext->ThrowNativeError("Socket is not connected"); char *data = NULL; pContext->LocalToString(params[2], &data); uv_buf_t* buffer = (uv_buf_t *)malloc(sizeof(uv_buf_t)); buffer->base = strdup(data); buffer->len = strlen(data); CAsyncWrite *pWrite = (CAsyncWrite *)malloc(sizeof(CAsyncWrite)); pWrite->pSocketContext = pSocketContext; pWrite->pBuffer = buffer; CAsyncAddJob Job; Job.CallbackFn = UV_OnAsyncWrite; Job.pData = pWrite; g_AsyncAddQueue.enqueue(Job); uv_async_send(&g_UV_AsyncAdded); return 1; } cell_t Native_AsyncSocket_SetConnectCallback(IPluginContext *pContext, const cell_t *params) { CAsyncSocketContext *pSocketContext = g_AsyncSocket.GetSocketInstanceByHandle(params[1]); if(pSocketContext == NULL) return pContext->ThrowNativeError("Invalid socket handle"); if(!pSocketContext->SetConnectCallback(params[2])) return pContext->ThrowNativeError("Invalid callback"); return true; } cell_t Native_AsyncSocket_SetErrorCallback(IPluginContext *pContext, const cell_t *params) { CAsyncSocketContext *pSocketContext = g_AsyncSocket.GetSocketInstanceByHandle(params[1]); if(pSocketContext == NULL) return pContext->ThrowNativeError("Invalid socket handle"); if(!pSocketContext->SetErrorCallback(params[2])) return pContext->ThrowNativeError("Invalid callback"); return true; } cell_t Native_AsyncSocket_SetDataCallback(IPluginContext *pContext, const cell_t *params) { CAsyncSocketContext *pSocketContext = g_AsyncSocket.GetSocketInstanceByHandle(params[1]); if(pSocketContext == NULL) return pContext->ThrowNativeError("Invalid socket handle"); if(!pSocketContext->SetDataCallback(params[2])) return pContext->ThrowNativeError("Invalid callback"); return true; } // Sourcemod Plugin Events bool AsyncSocket::SDK_OnLoad(char *error, size_t maxlength, bool late) { g_Running = true; g_MallocHandles = 0; sharesys->AddNatives(myself, AsyncSocketNatives); sharesys->RegisterLibrary(myself, "AsyncSocket"); socketHandleType = handlesys->CreateType("AsyncSocket", this, 0, NULL, NULL, myself->GetIdentity(), NULL); smutils->AddGameFrameHook(OnGameFrame); g_UV_Loop = uv_default_loop(); uv_async_init(g_UV_Loop, &g_UV_AsyncAdded, UV_OnAsyncAdded); g_UV_AsyncAdded.close_cb = NULL; uv_thread_create(&g_UV_LoopThread, UV_EventLoop, NULL); return true; } void UV_OnWalk(uv_handle_t *pHandle, void *pArg) { uv_close(pHandle, pHandle->close_cb); } void AsyncSocket::SDK_OnUnload() { g_Running = false; handlesys->RemoveType(socketHandleType, myself->GetIdentity()); CAsyncAddJob Job; Job.CallbackFn = UV_Quit; Job.pData = NULL; g_AsyncAddQueue.enqueue(Job); uv_async_send(&g_UV_AsyncAdded); uv_thread_join(&g_UV_LoopThread); uv_walk(g_UV_Loop, UV_OnWalk, NULL); uv_run(g_UV_Loop, UV_RUN_DEFAULT); uv_loop_close(g_UV_Loop); smutils->RemoveGameFrameHook(OnGameFrame); printf("g_MallocHandles = %d\n", g_MallocHandles); } const sp_nativeinfo_t AsyncSocketNatives[] = { {"AsyncSocket.AsyncSocket", Native_AsyncSocket_Create}, {"AsyncSocket.Connect", Native_AsyncSocket_Connect}, {"AsyncSocket.Listen", Native_AsyncSocket_Listen}, {"AsyncSocket.Write", Native_AsyncSocket_Write}, {"AsyncSocket.SetConnectCallback", Native_AsyncSocket_SetConnectCallback}, {"AsyncSocket.SetErrorCallback", Native_AsyncSocket_SetErrorCallback}, {"AsyncSocket.SetDataCallback", Native_AsyncSocket_SetDataCallback}, {NULL, NULL} };