hdfs在保存文件时,会将一个文件拆分成多个block,block的大小默认是128M。一个block既不会多出一个字节,也不会少一个字节,而是刚好128*1024
*1024个字节,这就引出了一个平常不会注意的细节,对于文本文件,数据是以行的形式存在的,如果一个block的大小固定为128M,那么hdfs是怎么做到的呢?
一定存在,而且几乎每一个block都存在这种情况,文件的最后一行并不完整,最后一行数据被切分成了两部分,前面的部分在当前的block里,后面的部分在下一个block里,只有这样,才能保证一个block的大小刚好是128M。
运行在hadoop上的spark程序在读取数据时,可以并行读取block的数据,当前block读取结束后,必须读取下一个block的第一行数据,这样才能读取当前block最后一行的完整数据。
处于兴趣,我打算写一个可以模型hdfs保存文件的程序,为了确保文件的大小是固定的,必须向文件里写入字节串,这样才能精确的计算写入数据的大小。当一个block写满时,则创建一个新的block文件继续写入,最后一行数据如果不能都写入一个block,则将其拆分到两个block中。
python 文件对象提供的write方法只接收字符串,但文件对象还有一个buffer对象,它可以写入字节串,使用字节串,才能精确地计算写入数据的大小,控制一个block的大小。
class SplitFile():
def __init__(self, filename, block_szie):
self.filename = filename # 文件名称
self.file_index = 0 # 文件索引
self.block_size = block_szie # 文件块大小
self.curr_file_handler = None # 文件句柄
self._set_new_handler()
def _get_filename(self):
file_name = f"{self.filename}.{self.file_index}"
self.file_index += 1
return file_name
def _set_new_handler(self):
if self.curr_file_handler is None:
self.curr_file_handler = BlockFile(self._get_filename(), self.block_size)
else:
if not self.curr_file_handler.is_writeable():
self.curr_file_handler.close()
self.curr_file_handler = BlockFile(self._get_filename(), self.block_size)
def write(self, data):
"""
写文件
:param data:
:return:
"""
self._set_new_handler()
if isinstance(data, str):
if not data.endswith('\n'):
data += '\n'
data = data.encode(encoding='utf-8')
elif isinstance(data, bytes):
if not data.endswith(b'\n'):
data += b'\n'
else:
raise
surplus = self.curr_file_handler.write(data)
if surplus:
self.write(surplus)
def close(self):
self.curr_file_handler.close()
class BlockFile():
def __init__(self, filename, block_size):
self.filename = filename # 块文件名称
self.block_size = block_size # 块大小
self.writen_size = 0 # 已经写的字节数
self._create_file()
def is_writeable(self): # 是否可写
return self.block_size > self.writen_size
def _create_file(self):
self.file = open(self.filename, 'w', encoding='utf-8') # 文本模式打开
def write(self, data):
"""
写文件
:param data:
:return:
"""
data_length = len(data)
writeable_length = self.block_size - self.writen_size
surplus = b''
if writeable_length >= data_length:
self.file.buffer.write(data)
else:
surplus = data[writeable_length:]
self.file.buffer.write(data[0:writeable_length])
self.writen_size += data_length
return surplus
def close(self):
self.file.close()
def test_write():
sf = SplitFile('1.txt', 100)
for i in range(30):
sf.write('我是一行数据')
sf.close()
def test_read():
with open('1.txt.0', 'r', encoding='utf-8')as f:
lines = f.buffer.readlines()
if not lines[-1].endswith(b'\n'):
with open('1.txt.1', 'r', encoding='utf-8')as f:
cut_line = f.buffer.readline()
lines[-1] += cut_line
for line in lines:
print(line.decode(encoding='utf-8'), end='')
if __name__ == '__main__':
test_write()
test_read()
1.txt.0文件内容
我是一行数据
我是一行数据
我是一行数据
我是一行数据
我是一行数据
我�
1.txt.1文件内容
�一行数据
我是一行数据
我是一行数据
我是一行数据
我是一行数据
我是一�
乱码是因为一个汉字由3个字节构成,拆开以后不能正常显示,test_read函数读取1.txt.1 的第一行补齐1.txt.0最后一行,实测可行。
QQ交流群: 211426309