【发布时间】:2022-01-10 14:28:43
【问题描述】:
我是 Pyspark 的初学者,我正在尝试将成批的流式 JSON 字符串预处理为可以输入机器学习模型的格式,唯一的限制是模型必须随着流式数据而增量学习无法存储。
以下是对我目前进展的解释:
我有 twitter-sentiment 数据集,已清理并存储为本地系统中的 .csv 文件。每条记录有两个特点:
- 情绪 --- > 这是一个单一的数字:0 表示负面,4 表示正面。
- 文本 ---------- > 推文的文本。
下面提到的 python 程序将该数据集作为 JSON 字符串批量传输到 TCP 端口。我已将 .csv 文件和批处理大小指定为命令行参数。
注意:我只包含了必要的代码。
Stream.py:
For TCP Connection:
TCP_IP = "localhost"
TCP_PORT = 6100
def connectTCP():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((TCP_IP, TCP_PORT))
s.listen(1)
print(f"Waiting for connection on port {TCP_PORT}...")
connection, address = s.accept()
print(f"Connected to {address}")
return connection, address
将数据字符串作为 json 流式传输到指定端口的代码:
def streamCSVFile(tcp_connection, input_file):
df = pd.read_csv(input_file)
values = df.values.tolist()
# loop through batches of size batch_size lines
for i in tqdm(range(0, len(values) - batch_size + 2, batch_size)):
send_data = values[i:i + batch_size] # load batch of rows
payload = dict() # create a payload
# iterate over the batch
for mini_batch_index in range(len(send_data)):
payload[mini_batch_index] = dict() # create a record
# iterate over the features
for feature_index in range(len(send_data[0])):
# add the feature to the record
payload[mini_batch_index][f'feature{feature_index}'] = send_data[mini_batch_index][feature_index]
send_batch = (json.dumps(payload) + '\n').encode()
try:
tcp_connection.send(send_batch) # send the payload to Spark
except BrokenPipeError: # this indicates that the message length of the payload is more than what is allowed via TCP
print("Either batch size is too big for the dataset or the connection was closed")
except Exception as error_message:
print(f"Exception thrown but was handled: {error_message}")
time.sleep(5)
下面是我编写的用于接收流式 JSON 并在终端上打印的简单代码。
rec.py:
import socket
import json
HOST = 'localhost'
PORT = 6100
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
data = s.recv(1024)
print(repr(data))
s.close()
上面的代码工作正常,但我想要实现的是从流中获取批量读取的 JSON 对象,并将该对象转换为我可以执行转换的 rdd 或数据帧。 (例如过滤标签值并按它们分组)。
数据集截图(供参考):
【问题讨论】:
标签: python json apache-spark pyspark