97 lines
		
	
	
		
			2.5 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			97 lines
		
	
	
		
			2.5 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
#!/usr/bin/python3
 | 
						|
# -*- coding: utf-8 -*-
 | 
						|
import asyncio
 | 
						|
import logging
 | 
						|
import json
 | 
						|
 | 
						|
class ClientProtocol(asyncio.Protocol):
 | 
						|
	def __init__(self, loop, master):
 | 
						|
		self.Loop = loop
 | 
						|
		self.Master = master
 | 
						|
		self.Transport = None
 | 
						|
		self.Buffer = bytearray()
 | 
						|
 | 
						|
	def connection_made(self, transport):
 | 
						|
		self.Transport = transport
 | 
						|
 | 
						|
	def data_received(self, data):
 | 
						|
		self.Buffer += data
 | 
						|
 | 
						|
		chunks = self.Buffer.split(b'\0')
 | 
						|
		if data[-1] == b'\0':
 | 
						|
			chunks = chunks[:-1]
 | 
						|
			self.Buffer = bytearray()
 | 
						|
		else:
 | 
						|
			self.Buffer = bytearray(chunks[-1])
 | 
						|
			chunks = chunks[:-1]
 | 
						|
 | 
						|
		for chunk in chunks:
 | 
						|
			self.Master.OnReceive(chunk)
 | 
						|
 | 
						|
	def connection_lost(self, exc):
 | 
						|
		self.Transport.close()
 | 
						|
		self.Transport = None
 | 
						|
		self.Master.OnDisconnect(exc)
 | 
						|
 | 
						|
	def Send(self, data):
 | 
						|
		if self.Transport:
 | 
						|
			self.Transport.write(data)
 | 
						|
 | 
						|
class AsyncClient():
 | 
						|
    def __init__(self, loop, host, port, master):
 | 
						|
        self.Logger = logging.getLogger(__class__.__name__)
 | 
						|
        self.Loop = loop
 | 
						|
        self.Host = host
 | 
						|
        self.Port = port
 | 
						|
        self.Master = master
 | 
						|
 | 
						|
        self.Protocol = None
 | 
						|
        self.SendLock = asyncio.Lock()
 | 
						|
        self.RecvFuture = None
 | 
						|
 | 
						|
    async def Connect(self):
 | 
						|
        while True:
 | 
						|
            self.Logger.warn("Reconnecting...")
 | 
						|
            try:
 | 
						|
                _, self.Protocol = await self.Loop.create_connection(lambda: ClientProtocol(self.Loop, self), host = self.Host, port = self.Port)
 | 
						|
                break
 | 
						|
            except:
 | 
						|
                await asyncio.sleep(1.0)
 | 
						|
    
 | 
						|
    def OnReceive(self, data):
 | 
						|
        Obj = json.loads(data)
 | 
						|
 | 
						|
        if "method" in Obj and Obj["method"] == "publish":
 | 
						|
            self.Master.OnPublish(Obj)
 | 
						|
        else:
 | 
						|
            if self.RecvFuture:
 | 
						|
                self.RecvFuture.set_result(Obj)
 | 
						|
 | 
						|
    def OnDisconnect(self, exc):
 | 
						|
        self.Protocol = None
 | 
						|
        if self.RecvFuture:
 | 
						|
            self.RecvFuture.cancel()
 | 
						|
            self.Master.OnDisconnect(exc)
 | 
						|
 | 
						|
    async def Send(self, obj):
 | 
						|
        if not self.Protocol:
 | 
						|
            return None
 | 
						|
 | 
						|
        Data = json.dumps(obj, ensure_ascii = False, separators = (',', ':')).encode("UTF-8")
 | 
						|
 | 
						|
        async with self.SendLock:
 | 
						|
            if not self.Protocol:
 | 
						|
                return None
 | 
						|
 | 
						|
            self.RecvFuture = asyncio.Future()
 | 
						|
            self.Protocol.Send(Data)
 | 
						|
            await self.RecvFuture
 | 
						|
 | 
						|
            if self.RecvFuture.done():
 | 
						|
                Obj = self.RecvFuture.result()
 | 
						|
            else:
 | 
						|
                Obj = None
 | 
						|
 | 
						|
        self.RecvFuture = None
 | 
						|
        return Obj
 |