// Fill out your copyright notice in the Description page of Project Settings.

#pragma once

#include "CoreMinimal.h"
#include "Engine/GameInstance.h"
#include "CoreMinimal.h"
#include "GameFramework/Actor.h"
#include "HAL/Runnable.h"
#include "HAL/ThreadSafeBool.h"
#include "Containers/Queue.h"
#include "UObject/WeakObjectPtrTemplates.h"
#include "NewTcpClient.generated.h"


struct FIPv4AddressEx
{
    union
    {
        /** The IP address value as A.B.C.D components. */
        struct
        {
#if PLATFORM_LITTLE_ENDIAN
#ifdef _MSC_VER
            uint8 D, C, B, A;
#else
            uint8 D GCC_ALIGN(4);
            uint8 C, B, A;
#endif
#else
            uint8 A, B, C, D;
#endif
        };

        /** The IP address value in host byte order. */
        uint32 Value;
    };
    static bool Parse(const FString& AddressString, FIPv4AddressEx& OutAddress);
};


DECLARE_DYNAMIC_DELEGATE_OneParam(FTcpSocketDisconnectDelegate, int32, ConnectionId);
DECLARE_DYNAMIC_DELEGATE_OneParam(FTcpSocketConnectDelegate, int32, ConnectionId);
DECLARE_DYNAMIC_DELEGATE_OneParam(FTcpSocketReceivedByteMessageDelegate,UPARAM(ref)  TArray<uint8>&, Message);  //
//DECLARE_DYNAMIC_DELEGATE_OneParam(FTcpSocketReceivedStringMessageDelegate,UPARAM(ref) FString&, Message); //
DECLARE_DYNAMIC_DELEGATE_OneParam(FTcpSocketReceivedStringMessageDelegate,FString,MessageString);

class FyzTcpClient :public FRunnable
{
public:
    FyzTcpClient();
    ~FyzTcpClient();

    /** Thread to run the worker FRunnable on */
    FRunnableThread* m_Thread = nullptr;


    FSocket* m_Socket;
    FString m_ipAddress;
    int m_port;

    // SPSC = single producer, single consumer.
    TQueue<TArray<uint8>, EQueueMode::Mpsc> Inbox; // Messages we read from socket and send to main thread. Runner thread is producer, main thread is consumer.
    TQueue<TArray<uint8>, EQueueMode::Mpsc> Outbox; // Messages to send to socket from main thread. Main thread is producer, runner thread is consumer.


    int32 m_RecvBufferSize;
    int32 m_ActualRecvBufferSize;
    int32 m_SendBufferSize;
    int32 m_ActualSendBufferSize;
    float m_TimeBetweenTicks;
    bool m_bConnected = false;

    void Connect(const FString& ipAddress, int32 port);
    

    void ReConnect();

    // Begin FRunnable interface.
    virtual bool Init() override;
    virtual uint32 Run() override;
    virtual void Stop() override;
    virtual void Exit() override;
    // End FRunnable interface    

    /** Shuts down the thread */
    void SocketShutdown();

    /* Getter for bConnected */
    bool isConnected();

    void UpdateRecvMessage();
    bool ReadFromInbox(TArray<uint8>& byteArray);
    void ExecuteOnMessageReceived();



    static FString Message_ReadString(TArray<uint8>& Message, int32 BytesLength);
    static TArray<uint8> Conv_StringToBytes(const FString& InStr);
    static TArray<uint8> Concat_BytesBytes(TArray<uint8> A, TArray<uint8> B);
    static TArray<uint8> Conv_IntToBytes(int32 InInt);
    static TArray<uint8> Conv_FloatToBytes(float InFloat);
    static TArray<uint8> Conv_ByteToBytes(uint8 InByte);
    static int32 Message_ReadInt(TArray<uint8>& Message);
    static uint8 Message_ReadByte(TArray<uint8>& Message);
    static bool Message_ReadBytes(int32 NumBytes,TArray<uint8>& Message, TArray<uint8>& ReturnArray);
    static float Message_ReadFloat(TArray<uint8>& Message);
    


    FTcpSocketDisconnectDelegate DisconnectedDelegate;
    FTcpSocketConnectDelegate ConnectedDelegate;
    FTcpSocketReceivedByteMessageDelegate MessageReceivedByteDelegate;
    FTcpSocketReceivedStringMessageDelegate     m_TcpRecvDelegate;
    /* Blocking send */
    bool BlockingSend(const uint8* Data, int32 BytesToSend);
    void AddToOutbox(TArray<uint8> Message);
private:
    

    double m_LastReConnectTime;
    /** thread should continue running */
    FThreadSafeBool m_bRun = false;

    /** Critical section preventing multiple threads from sending simultaneously */
    FCriticalSection SendCriticalSection;
};


/**
 * 
 */
UCLASS()
class TESTPROJECT_API UNewTcpClient : public UGameInstance
{
    GENERATED_BODY()
    
public:
    FyzTcpClient  m_TcpClient;


    UFUNCTION(BlueprintCallable, Category = "MySocket")
    void Setup(FString ip, int port);

    //发消息(主线程)
    UFUNCTION(BlueprintCallable, Category = "MySocket")
    bool SocketSend(FString message);

    UFUNCTION(BlueprintCallable, Category = "MySocket")
    void SetRecvMsgByteArrayDelegate(const FTcpSocketReceivedByteMessageDelegate& OnMessageReceived);

    
    UFUNCTION(BlueprintCallable, Category = "MySocket")
    void SetRecvMsgStringDelegate(const FTcpSocketReceivedStringMessageDelegate& OnMessageReceived);


    UFUNCTION(BlueprintCallable, Category = "MySocket")
    void UpdateRecv();


    UFUNCTION(BlueprintCallable, Category = "MySocket")
    void SendStringToServer(FString str);

    UFUNCTION(BlueprintCallable, Category = "MySocket")
    void SendBytesToServer(TArray<uint8> bytes);

};



// Fill out your copyright notice in the Description page of Project Settings.


#include "NewTcpClient.h"
#include "SocketSubsystem.h"
#include "IPAddress.h"
#include "Sockets.h"
#include "HAL/RunnableThread.h"
#include "Async/Async.h"
#include <string>
#include "CoreMinimal.h"
#include "Engine/GameInstance.h"
#include "SocketSubsystem.h"
//#include "NetWorking/Public/Interfaces/IPv4/IPv4Address.h"
#include "Logging/MessageLog.h"
#include "HAL/UnrealMemory.h"



bool FIPv4AddressEx::Parse(const FString& AddressString, FIPv4AddressEx& OutAddress)
{
    TArray<FString> Tokens;




    if (AddressString.ParseIntoArray(Tokens, TEXT("."), false) == 4)
    {

        OutAddress.A = FCString::Atoi(*Tokens[0]);
        OutAddress.B = FCString::Atoi(*Tokens[1]);
        OutAddress.C = FCString::Atoi(*Tokens[2]);
        OutAddress.D = FCString::Atoi(*Tokens[3]);

        return true;
    }

    return false;
}



FyzTcpClient::FyzTcpClient()
{
    m_RecvBufferSize = 1024;
    m_SendBufferSize = 1024;
    m_TimeBetweenTicks = 0.03;
    m_LastReConnectTime = 0.0f;
    m_bRun = false;
}
FyzTcpClient::~FyzTcpClient()
{
    Stop();
    if (m_Thread)
    {
        m_Thread->WaitForCompletion();
        delete m_Thread;
        m_Thread = nullptr;
    }
}


void FyzTcpClient::Connect(const FString& ipAddresstemp, int32 porttemp)
{
    m_RecvBufferSize = 1024;
    m_SendBufferSize = 1024;
    m_TimeBetweenTicks = 0.016;
    m_LastReConnectTime = 0.0f;

    m_ipAddress = ipAddresstemp;
    m_port = porttemp;

    
    


    Init();

    check(!m_Thread && "Thread wasn't null at the start!");
    check(FPlatformProcess::SupportsMultithreading() && "This platform doesn't support multithreading!");
    if (m_Thread)
    {
        UE_LOG(LogTemp, Log, TEXT("Log: Thread isn't null. It's: %s"), *m_Thread->GetThreadName());
    }
    m_Thread = FRunnableThread::Create(this, *FString::Printf(TEXT("FTcpSocketWorker %s:%d"), *ipAddresstemp, porttemp), 128 * 1024, TPri_Normal);
    UE_LOG(LogTemp, Log, TEXT("Log: Created thread"));
}


bool FyzTcpClient::Init()
{
    m_bRun = true;
    m_bConnected = false;

    m_Socket = NULL;
    return true;
}



void FyzTcpClient::ReConnect()
{
    // if there is still a socket, close it so our peer will get a quick disconnect notification
    if (m_Socket)
    {
        m_Socket->Close();
    }

    m_Socket = ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM)->CreateSocket(NAME_Stream, TEXT("default"), false);
    if (!m_Socket)
    {
        UE_LOG(LogTemp, Log, TEXT("FyzTcpClient Create Socket Error"));
        return;
    }

    m_Socket->SetReceiveBufferSize(m_RecvBufferSize, m_ActualRecvBufferSize);
    m_Socket->SetSendBufferSize(m_SendBufferSize, m_ActualSendBufferSize);

    FIPv4AddressEx ip;
    FIPv4AddressEx::Parse(m_ipAddress, ip);

    TSharedRef<FInternetAddr> internetAddr = ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM)->CreateInternetAddr();
    internetAddr->SetIp(ip.Value);
    internetAddr->SetPort(m_port);


    double NowTime = FPlatformTime::Seconds();

    m_bConnected = m_Socket->Connect(*internetAddr);
    if (m_bConnected)
    {

        m_LastReConnectTime = NowTime;

        ConnectedDelegate.ExecuteIfBound(0);


        GEngine->AddOnScreenDebugMessage(-1, 5.5f, FColor::Red, FString::Printf(TEXT("TcpClient ReConnect yes %d "),m_bConnected));
    }
    else
    {
        GEngine->AddOnScreenDebugMessage(-1, 5.5f, FColor::Blue, FString::Printf(TEXT("TcpClient ReConnect faild %s %d"), *m_ipAddress, m_port));
        m_LastReConnectTime = NowTime;
        
    }
}


uint32 FyzTcpClient::Run()
{
    
    while (m_bRun)
    {
        FDateTime timeBeginningOfTick = FDateTime::UtcNow();
        
        

        // Connect
        if (!m_bConnected)
        {
            

            double NowTime = FPlatformTime::Seconds();

            if (NowTime - m_LastReConnectTime < 5.0f)
            {
                
                FPlatformProcess::Sleep(0.5);
                continue;
            }

            
            ReConnect();
            m_LastReConnectTime = NowTime;
            continue;
        }

        

        // check if we weren't disconnected from the socket
        m_Socket->SetNonBlocking(true); // set to NonBlocking, because Blocking can't check for a disconnect for some reason
        int32 t_BytesRead;
        uint8 t_Dummy;
        if (!m_Socket->Recv(&t_Dummy, 1, t_BytesRead, ESocketReceiveFlags::Peek))
        {
            m_bConnected = false;
            DisconnectedDelegate.ExecuteIfBound(0);
            GEngine->AddOnScreenDebugMessage(-1, 5.5f, FColor::Red, FString::Printf(TEXT("Dis Connect TcpServer")));
            continue;
        }
        m_Socket->SetNonBlocking(false);    // set back to Blocking

        // if Outbox has something to send, send it
        while (!Outbox.IsEmpty())
        {
            TArray<uint8> toSend;
            Outbox.Dequeue(toSend);

            if (!BlockingSend(toSend.GetData(), toSend.Num()))
            {
                // if sending failed, stop running the thread
                //m_bRun = false;
                continue;
            }
        }

        // if we can read something        
        uint32 PendingDataSize = 0;
        TArray<uint8> receivedData;

        int32 BytesReadTotal = 0;
        // keep going until we have no data.
        for (;;)
        {
            if (!m_Socket->HasPendingData(PendingDataSize))
            {
                // no messages
                break;
            }

            

            receivedData.SetNumUninitialized(BytesReadTotal + PendingDataSize);

            int32 BytesRead = 0;
            if (!m_Socket->Recv(receivedData.GetData() + BytesReadTotal, m_ActualRecvBufferSize, BytesRead))
            {
                // ISocketSubsystem* SocketSubsystem = ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM);
                // error code: (int32)SocketSubsystem->GetLastErrorCode()
                
                break;
            }
            BytesReadTotal += BytesRead;

            /* TODO: if we have more PendingData than we could read, continue the while loop so that we can send messages if we have any, and then keep recving*/
        }

        // if we received data, inform the main thread about it, so it can read TQueue
        if (receivedData.Num() != 0)
        {
            Inbox.Enqueue(receivedData);
            
        }

        

        /* In order to sleep, we will account for how much this tick took due to sending and receiving */
        FDateTime timeEndOfTick = FDateTime::UtcNow();
        FTimespan tickDuration = timeEndOfTick - timeBeginningOfTick;
        float secondsThisTickTook = tickDuration.GetTotalSeconds();
        float timeToSleep = m_TimeBetweenTicks - secondsThisTickTook;
        if (timeToSleep > 0.0f)
        {
            //AsyncTask(ENamedThreads::GameThread, [timeToSleep]() { ATcpSocketConnection::PrintToConsole(FString::Printf(TEXT("Sleeping: %f seconds"), timeToSleep), false); });
            FPlatformProcess::Sleep(timeToSleep);
        }
    }

    m_bConnected = false;

    //AsyncTask(ENamedThreads::GameThread, [this]() {ThreadSpawnerActor.Get()->ExecuteOnDisconnected(id, ThreadSpawnerActor);});

    SocketShutdown();
    return 0;
}

void FyzTcpClient::Stop()
{
    m_bRun = false;
}

bool FyzTcpClient::isConnected()
{
    FScopeLock ScopeLock(&SendCriticalSection);
    return m_bConnected;
}

bool  FyzTcpClient::ReadFromInbox(TArray<uint8>& byteArray)
{
    
    bool flag = Inbox.Dequeue(byteArray);
    if (true == flag)
    {

        return true;
    }
    else
    {
        return false;
    }
    return false;
}

void FyzTcpClient::ExecuteOnMessageReceived()
{
    TArray<uint8> msg;
    bool retflag = ReadFromInbox(msg);


    if (true == retflag)
    {
        MessageReceivedByteDelegate.ExecuteIfBound(msg);

        

        FString retStr = Message_ReadString(msg, msg.Num());

        //MessageReceivedStringDelegate.ExecuteIfBound(retStr);

        m_TcpRecvDelegate.ExecuteIfBound(retStr);
        //代码里面就不打印了,蓝图里面打印吧
        //GEngine->AddOnScreenDebugMessage(-1, 5.5f, FColor::Red, FString::Printf(TEXT("get Msg  %s "),*retStr));
    }
    

    
}

void FyzTcpClient::UpdateRecvMessage()
{
    ExecuteOnMessageReceived();


}



void FyzTcpClient::AddToOutbox(TArray<uint8> Message)
{
    Outbox.Enqueue(Message);
}


void FyzTcpClient::Exit()
{

}

bool FyzTcpClient::BlockingSend(const uint8* Data, int32 BytesToSend)
{
    if (BytesToSend > 0)
    {
        int32 BytesSent = 0;
        if (!m_Socket->Send(Data, BytesToSend, BytesSent))
        {
            return false;
        }
    }
    return true;
}

void FyzTcpClient::SocketShutdown()
{
    // if there is still a socket, close it so our peer will get a quick disconnect notification
    if (m_Socket)
    {
        m_Socket->Close();
    }
}


FString FyzTcpClient::Message_ReadString(TArray<uint8>& Message, int32 BytesLength)
{
    if (BytesLength <= 0)
    {

        return FString("");
    }
    if (Message.Num() < BytesLength)
    {

        return FString("");
    }

    TArray<uint8> StringAsArray;
    StringAsArray.Reserve(BytesLength);

    for (int i = 0; i < BytesLength; i++)
    {
        StringAsArray.Add(Message[0]);
        Message.RemoveAt(0);
    }

    std::string cstr(reinterpret_cast<const char*>(StringAsArray.GetData()), StringAsArray.Num());
    return FString(UTF8_TO_TCHAR(cstr.c_str()));
}


TArray<uint8> FyzTcpClient::Conv_StringToBytes(const FString& InStr)
{
    FTCHARToUTF8 Convert(*InStr);
    int BytesLength = Convert.Length(); //length of the utf-8 string in bytes (when non-latin letters are used, it's longer than just the number of characters)
    uint8* messageBytes = static_cast<uint8*>(FMemory::Malloc(BytesLength));
    FMemory::Memcpy(messageBytes, (uint8*)TCHAR_TO_UTF8(InStr.GetCharArray().GetData()), BytesLength); //mcmpy is required, since TCHAR_TO_UTF8 returns an object with a very short lifetime

    TArray<uint8> result;
    for (int i = 0; i < BytesLength; i++)
    {
        result.Add(messageBytes[i]);
    }

    FMemory::Free(messageBytes);

    return result;
}

TArray<uint8> FyzTcpClient::Concat_BytesBytes(TArray<uint8> A, TArray<uint8> B)
{
    TArray<uint8> ArrayResult;

    for (int i = 0; i < A.Num(); i++)
    {
        ArrayResult.Add(A[i]);
    }

    for (int i = 0; i < B.Num(); i++)
    {
        ArrayResult.Add(B[i]);
    }

    return ArrayResult;
}


TArray<uint8> FyzTcpClient::Conv_IntToBytes(int32 InInt)
{
    TArray<uint8> result;
    for (int i = 0; i < 4; i++)
    {
        result.Add(InInt >> i * 8);
    }
    return result;
}


TArray<uint8> FyzTcpClient::Conv_FloatToBytes(float InFloat)
{
    TArray<uint8> result;

    unsigned char const* p = reinterpret_cast<unsigned char const*>(&InFloat);
    for (int i = 0; i != sizeof(float); i++)
    {
        result.Add((uint8)p[i]);
    }
    return result;
}


TArray<uint8> FyzTcpClient::Conv_ByteToBytes(uint8 InByte)
{
    TArray<uint8> result{ InByte };
    return result;
}

int32 FyzTcpClient::Message_ReadInt(TArray<uint8>& Message)
{
    if (Message.Num() < 4)
    {
        //PrintToConsole("Error in the ReadInt node. Not enough bytes in the Message.", true);
        return -1;
    }

    int result;
    unsigned char byteArray[4];

    for (int i = 0; i < 4; i++)
    {
        byteArray[i] = Message[0];
        Message.RemoveAt(0);
    }

    FMemory::Memcpy(&result, byteArray, 4);

    return result;
}
uint8 FyzTcpClient::Message_ReadByte(TArray<uint8>& Message)
{
    if (Message.Num() < 1)
    {
        //PrintToConsole("Error in the ReadByte node. Not enough bytes in the Message.", true);
        return 255;
    }

    uint8 result = Message[0];
    Message.RemoveAt(0);
    return result;
}
bool FyzTcpClient::Message_ReadBytes(int32 NumBytes, TArray<uint8>& Message, TArray<uint8>& ReturnArray)
{
    for (int i = 0; i < NumBytes; i++)
    {
        if (Message.Num() >= 1)
            ReturnArray.Add(Message_ReadByte(Message));
        else
            return false;
    }
    return true;
}
float FyzTcpClient::Message_ReadFloat(TArray<uint8>& Message)
{
    if (Message.Num() < 4)
    {
        //PrintToConsole("Error in the ReadFloat node. Not enough bytes in the Message.", true);
        return -1.f;
    }

    float result;
    unsigned char byteArray[4];

    for (int i = 0; i < 4; i++)
    {
        byteArray[i] = Message[0];
        Message.RemoveAt(0);
    }

    FMemory::Memcpy(&result, byteArray, 4);

    return result;
}




void UNewTcpClient::Setup(FString ip, int port)
{
    m_TcpClient.Connect(ip,port);
}


bool UNewTcpClient::SocketSend(FString message)
{
    
    return true;
}

void UNewTcpClient::SetRecvMsgByteArrayDelegate(const FTcpSocketReceivedByteMessageDelegate& OnMessageReceived)
{
    m_TcpClient.MessageReceivedByteDelegate = OnMessageReceived;
}


void UNewTcpClient::SetRecvMsgStringDelegate(const FTcpSocketReceivedStringMessageDelegate& OnMessageReceived)
{
    //m_TcpClient.MessageReceivedStringDelegate = OnMessageReceived;
    m_TcpClient.m_TcpRecvDelegate = OnMessageReceived;
}

void UNewTcpClient::UpdateRecv()
{
    m_TcpClient.UpdateRecvMessage();
}


void UNewTcpClient::SendStringToServer(FString str)
{
    TArray<uint8> ret = FyzTcpClient::Conv_StringToBytes(str);
    SendBytesToServer(ret);


}


void UNewTcpClient::SendBytesToServer(TArray<uint8> bytes)
{
    m_TcpClient.AddToOutbox(bytes);
}

 

相关文章:

  • 2022-01-24
  • 2021-07-05
  • 2021-12-26
  • 2022-01-21
  • 2021-09-14
  • 2022-01-13
  • 2022-12-23
  • 2021-11-04
猜你喜欢
  • 2022-12-23
  • 2021-11-07
  • 2021-07-07
  • 2021-12-19
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
相关资源
相似解决方案