3种join算法的实现
关系R具有两个属性A和B,其中A和B的属性值均为int型(4个字节),A的值域为[1, 40],B的值域为[1, 1000]。
关系S具有两个属性C和D,其中C和D的属性值均为int型(4个字节)。C的值域为[20, 60],D的值域为[1, 1000]。
1)实现关系选择算法:实现关系选择算法,选出R.A=40或S.C=60的元组,并将结果存放在磁盘上。
2)实现关系投影算法:实现关系投影算法,对关系R上的A属性进行投影,并将结果存放在磁盘上。
3)实现Nested-Loop Join (NLJ)算法:基于ExtMem程序库,使用C语言实现NLJ算法,对关系R和S计算R.A连接S.C,并将结果存放在磁盘上。
import os
import random
import math
class relationship():
def __init__(self,name,properties=["DEFALT_0","DEFALT_1"],minimums=[0,0],maximums=[1,1]):
'''
初始化关系对象
:param name: 关系名
:param properties:属性名的list
:param minimums:属性值域最小值的list
:param maximums:属性值域最大值的list
'''
self.name = name
self.properties = properties
self.minimums = minimums
self.maximums = maximums
#tuples 中存储了属性个list,属性的角标和属性对应list 在tuples中角标相同
self.tuples =[]
self.properties_amount = len(self.properties)
for i in range(len(self.properties)):
self.tuples.append([])
print("关系 " + self.name + " 已成功创建")
def add_tuple(self,property,tuple_value):
'''
向关系中根据属性添加元素
:param property: 属性
:param tuple_value: 元素的值
'''
for i in range(len(self.properties)):
if self.properties[i] == property:
if tuple_value <= self.maximums[i] and tuple_value >= self.minimums[i]:
self.tuples[i].append(tuple_value)
print("%d 已经成功添加到关系 %s 的属性 %s 中"%(tuple_value,self.name,property))
break
else:
print("%d 不在 %d 和 %d 之间"%(tuple_value,self.minimums[i],self.maximums[i]))
break
else:
if i == len(self.properties) - 1:
print("关系 " + self.name + " 中不存在属性 " + property)
def Write_Text(self,file_name, contant):
'''
将string写入指定的文件中
:param file_name: 文件名
:param contant: 待写入的string
:return:
'''
with open(file_name, "w") as f:
f.writelines(contant)
f.writelines("\n")
def write_to_disk(self):
'''
将对象中的信息写入到磁盘中
'''
path = os.path.abspath('.')
disk_path = path + "\disk"
relationship_path = disk_path + "\\" + self.name
if not os.path.exists(disk_path):
os.mkdir(disk_path)
print("创建文件夹 %s"%(disk_path))
if not os.path.exists(relationship_path):
os.mkdir(relationship_path)
print("创建文件夹 %s"%(relationship_path))
relationship.Write_Text(self, relationship_path + "\\" + "properties.txt",str(self.properties))
relationship.Write_Text(self, relationship_path + "\\" + "minimums.txt", str(self.minimums))
relationship.Write_Text(self, relationship_path + "\\" + "maximums.txt", str(self.maximums))
relationship.Write_Text(self, relationship_path + "\\" + "tuples.txt", str(self.tuples))
def Read_Text(self,file_name):
'''
读文件的第一行
:param file_name:待读文件路径
:return: 待读文件内容的第一行
'''
f = open(file_name, "r")
return f.readline()
def read_from_disk(self):
'''
从磁盘中读入该对象名对应的信息并复制给该对象中的字段
'''
path = os.path.abspath('.')
disk_path = path + "\disk"
relationship_path = disk_path + "\\" + self.name
if not os.path.exists(relationship_path):
print("数据库中不存在关系 %s" % (self.name))
else:
self.properties = (relationship.Read_Text(
self,relationship_path + "\\" + "properties.txt").replace("[","").replace("]","")).replace("'","").replace("\n","").split(",")
temp = (relationship.Read_Text(
self,relationship_path + "\\" + "minimums.txt").replace("[","").replace("]","")).replace("'","").replace("\n","").split(",")
self.minimums = [int(i) for i in temp]
temp = (relationship.Read_Text(
self, relationship_path + "\\" + "maximums.txt").replace("[", "").replace("]", "")).replace("'",
"").replace(
"\n", "").split(",")
self.maximums = [int(i) for i in temp]
temp = relationship.Read_Text(self, relationship_path + "\\" + "tuples.txt").split(", [")
temp_0 = []
for i in range(len(temp)):
if i == 0:
temp_0.append(
[int(j) for j in temp[i].replace("[[","").replace("]","").replace("'","").split(",")])
if i == len(temp) - 1:
temp_0.append(
[int(j) for j in temp[i].replace("]]", "").replace("[", "").replace("'", "").split(",")])
self.tuples = temp_0
def select(self,property,value):
'''
根据属性和值选择内容
:param property: 属性
:param value: 值
:return: 结果的list
'''
ending = []
ending.append(self.properties)
for i in range(len(self.properties)):
if self.properties[i] == property:
for j in range(len(self.tuples[i])):
if (self.tuples[i])[j] == value:
temp = []
for k in range(len(self.tuples)):
temp.append((self.tuples[k][j]))
ending.append(temp)
path = os.path.abspath('.')
disk_path = path + "\disk"
select_path = disk_path + "\select_" + property+ "=" + str(value) + "_from_" + self.name + ".txt"
relationship.Write_Text(self,select_path,str(ending))
print("选择%s.%s = %s :"%(self.name,property,str(value)))
print(ending)
print("")
return ending
def project(self,properties):
'''
投影且去重
:param properties:想要投影的属性
:return:投影且去重的结果的list,list中第一个元素为属性名
return example:
[['A'], [24], [31], [26], [28], [40]]
'''
ending = []
temp = []
amount = len(properties)
for i in range(len(properties)):
for j in range(len(self.properties)):
if self.properties[j] == properties[i]:
temp.append(self.tuples[j])
for i in range(len(self.tuples[0])):
temp_in =[]
for j in range(amount):
temp_in.append((temp[j])[i])
ending.append(temp_in)
new_ending = []
new_ending.append(properties)
for id in ending:
if id not in new_ending:
new_ending.append(id)
path = os.path.abspath('.')
disk_path = path + "\disk"
project_path = disk_path + "\project_" + str(properties) + "_from_" + self.name + ".txt"
relationship.Write_Text(self, project_path, str(new_ending))
print_ending = str(self.name) + "."
for i in range(len(properties)):
print_ending = print_ending + properties[i]
print("投影 %s : "%(print_ending))
print(new_ending)
print("")
return new_ending
def get_single_list(self,property):
'''
由属性获得对应的list
:param property: 属性名
:return: 属性名对应的list
'''
for i in range(len(self.properties)):
if property == self.properties[i]:
return self.tuples[i].copy()
def insert_sort(self,property):
'''
升序的插入排序
:param property:排序的依据属性
'''
for i in range(len(self.properties)):
if property == self.properties[i]:
for j in range(len(self.tuples[i])):
for k in range(j):
if self.tuples[i][j] < self.tuples[i][k]:
for l in range(len(self.tuples)):
self.tuples[l].insert(k,self.tuples[l].pop(j))
break
def get_same_position(self,property):
'''
对于已经排序过的list而言,获取同值/单个元素的首个元素位置
:param property: 元素对应的属性
:return: 同值/单个元素的首个元素位置对应的list
'''
ending = []
for i in range(len(self.properties)):
if property == self.properties[i]:
now_value = self.tuples[i][0]
ending.append(0)
for j in range(len(self.tuples[i])):
if now_value != self.tuples[i][j]:
now_value = self.tuples[i][j]
ending.append(j)
return ending
def nest_loop_join(relationships,buffer_sizes,properties):
'''
通过nest loop join算法进行关系的连接
:param relationships: 两个relationship对象构成的list
:param buffer_sizes: 两个relationship各自对应的缓冲区大小构成的list
:param properties: 两个relationship各自对应的连接的属性的list
:return:连接结果的list,list中第一个元素为属性名
return example:
[['A', 'B', 'C', 'D'], [20, 654, 20, 792], [33, 972, 33, 160], [33, 467, 33, 160]]
'''
ending = []
#用copy方法,否则temp和右侧的指向同一地址!!!
temp = relationships[0].properties.copy()
temp_1 = relationships[1].properties
for i in range(len(temp_1)):
temp.append(temp_1[i])
ending.append(temp)
for i in range(len(relationships[0].properties)):
if properties[0] == relationships[0].properties[i]:
list_0 = relationships[0].tuples[i]
for i in range(len(relationships[1].properties)):
if properties[1] == relationships[1].properties[i]:
list_1 = relationships[1].tuples[i]
for i in range(math.ceil((len(list_0)) / buffer_sizes[0])):
for j in range(math.ceil((len(list_1)) / buffer_sizes[1])):
for k in range(buffer_sizes[0]):
for l in range(buffer_sizes[1]):
position_0 = (i * buffer_sizes[0] + k)
position_1 = (j * buffer_sizes[1] + l)
if position_0 < len(list_0) and position_1 < len(list_1):
if list_0[position_0] == list_1[position_1]:
temp_in = []
for i_in in range(len(relationships[0].tuples)):
temp_in.append(((relationships[0].tuples)[i_in])[position_0])
for i_in in range(len(relationships[1].tuples)):
temp_in.append(((relationships[1].tuples)[i_in])[position_1])
ending.append(temp_in)
print("nest loop join 计算 %s.%s 与 %s.%s 连接的结果如下 :"%(
relationships[0].name,properties[0],relationships[1].name,properties[1]))
print(ending)
print("")
return ending
def hash(single_relationship,property,partitions_number=8):
'''
计算指定关系中的属性的每个元组中值的取余hash值,并返回hash值对应的元素在原关系中的角标的list的list
:param single_relationship: 想要计算hash的关系
:param property: 想要计算hash的属性
:param partitions_number: hash时分成的桶数(组数)
:return: hash值对应的元素在原关系中的角标的list的list
return example:
当关系中有20个元组,分成8组时
[[0, 14], [10], [4], [2, 3, 5, 11], [9, 16], [6, 7, 15], [1, 12, 17, 19], [8, 13, 18]]
'''
ending = []
for i in range(partitions_number):
ending.append([])
temp = []
for i in range(len(single_relationship.properties)):
if property == single_relationship.properties[i]:
temp = single_relationship.tuples[i]
break
for i in range(len(temp)):
ending[temp[i] % partitions_number].append(i)
return ending
def hash_join(relationships,properties,partitions_number=8):
'''
通过hash join算法进行关系的连接
:param relationships: 两个relationship对象构成的list
:param properties: 两个relationship各自对应的连接的属性的list
:param partitions_number: hash时分成的桶数(组数)
:return: 连接结果的list,list中第一个元素为属性名
return example:
[['A', 'B', 'C', 'D'], [25, 353, 25, 986], [34, 309, 34, 597]]
'''
ending = []
temp = relationships[0].properties.copy()
temp_1 = relationships[1].properties
for i in range(len(temp_1)):
temp.append(temp_1[i])
ending.append(temp)
list_0 = relationships[0].get_single_list(properties[0])
list_1 = relationships[1].get_single_list(properties[1])
hash_list_0 = hash(relationships[0],properties[0],partitions_number)
hash_list_1 = hash(relationships[1], properties[1], partitions_number)
for i in range(partitions_number):
for j in range(len(hash_list_0[i])):
for k in range(len(hash_list_1[i])):
position_0 = (hash_list_0[i])[j]
position_1 = (hash_list_1[i])[k]
if list_0[position_0] == list_1[position_1]:
temp_in = []
for i_in in range(len(relationships[0].tuples)):
temp_in.append(((relationships[0].tuples)[i_in])[position_0])
for i_in in range(len(relationships[1].tuples)):
temp_in.append(((relationships[1].tuples)[i_in])[position_1])
ending.append(temp_in)
print("hash join 计算 %s.%s 与 %s.%s 连接的结果如下 :" % (
relationships[0].name, properties[0], relationships[1].name, properties[1]))
print(ending)
print("")
return ending
def sorted_join(lists):
'''
计算两个lists中的已经升序排序了的lists[0]和lists[1]值相同的元素在各自的list中的位置
:param lists:两个待连接的list的list
:return:一个list的而原数组,lists[0]的ending[i][0] 位置和lists[1]的ending[i][j] j!=0位置的元素值相同
return example:
[[12, 0, 1], [13, 0, 1], [14, 0, 1], [15, 0, 1], [16, 2], [17, 2], [18, 3, 4, 5, 6, 7], [19, 3, 4, 5, 6, 7]]
'''
ending = []
if lists[0][len(lists[0]) - 1] < lists[1][0]:
ending = None
else:
start = 0
for i in range(len(lists[0])):
start_change_flag = 0
temp = []
for j in range(start,len(lists[1])):
if lists[1][j] == lists[0][i]:
if start_change_flag == 0:
start_change_flag = 1
start = j
temp.append(i)
temp.append(j)
else:
temp.append(j)
if lists[1][j] > lists[0][i]:
break
if not len(temp) == 0:
ending.append(temp)
return ending
def sort_merge_join(relationships,buffer_sizes,properties):
'''
通过sort merge join算法进行关系的连接
:param relationships:两个relationship对象构成的list
:param properties: 两个relationship各自对应的连接的属性的list
:param buffer_sizes:两个relationship各自对应的缓冲区大小构成的list
:return:连接结果的list,list中第一个元素为属性名
return example:
[['A', 'B', 'C', 'D'], [20, 429, 20, 482], [20, 429, 20, 326], [20, 665, 20, 482]]
'''
ending = []
temp = relationships[0].properties.copy()
temp_1 = relationships[1].properties
for i in range(len(temp_1)):
temp.append(temp_1[i])
ending.append(temp)
relationships[0].insert_sort(properties[0])
relationships[1].insert_sort(properties[1])
for i in range(len(relationships[0].properties)):
if properties[0] == relationships[0].properties[i]:
list_0 = relationships[0].tuples[i]
for i in range(len(relationships[1].properties)):
if properties[1] == relationships[1].properties[i]:
list_1 = relationships[1].tuples[i]
lists_0 = []
block_number_0 = math.ceil((len(list_0)) / buffer_sizes[0])
for i in range(block_number_0):
if i == block_number_0 - 1:
lists_0.append(list_0[i * buffer_sizes[0]:len(list_0)])
else:
lists_0.append(list_0[i * buffer_sizes[0]:(i + 1) * buffer_sizes[0]])
lists_1 = []
block_number_1 = math.ceil((len(list_1)) / buffer_sizes[1])
for i in range(block_number_1):
if i == block_number_1 - 1:
lists_1.append(list_1[i * buffer_sizes[1]:len(list_1)])
else:
lists_1.append(list_1[i * buffer_sizes[1]:(i + 1) * buffer_sizes[1]])
list_ending = []
for i in range(block_number_0):
start = 0
start_changed_flag = 0
for j in range(start,block_number_1):
temp_2 = sorted_join([lists_0[i],lists_1[j]])
if temp_2 == None:
break
for k in range(len(temp_2)):
temp_2[k][0] = temp_2[k][0] + i * buffer_sizes[0]
for l in range(len(temp_2[k]) - 1):
temp_2[k][l + 1] = temp_2[k][l + 1] + j * buffer_sizes[1]
if start_changed_flag == 0:
start = j
start_changed_flag = 1
list_ending.append(temp_2)
for i in range(len(list_ending)):
for j in range(len(list_ending[i])):
for k in range(len(list_ending[i][j]) - 1):
temp_in = []
for i_in in range(len(relationships[0].tuples)):
temp_in.append(((relationships[0].tuples)[i_in])[list_ending[i][j][0]])
for i_in in range(len(relationships[1].tuples)):
temp_in.append(((relationships[1].tuples)[i_in])[list_ending[i][j][k + 1]])
ending.append(temp_in)
print("sort merge join 计算 %s.%s 与 %s.%s 连接的结果如下 :" % (
relationships[0].name, properties[0], relationships[1].name, properties[1]))
print(ending)
return ending
if __name__ == '__main__':
R = relationship("R", ["A", "B"], [1, 1], [40, 1000])
S = relationship("S", ["C", "D"], [20, 1], [60, 1000])
for i in range(100):
R.add_tuple("A", random.randint(1, 40))
R.add_tuple("B", random.randint(1, 1000))
S.add_tuple("C", random.randint(20, 60))
S.add_tuple("D", random.randint(1, 1000))
R.select("A", 40)
R.project(["A"])
R.project(["A","B"])
nest_loop_join([R,S],[20,20],["A","C"])
hash_join([R,S],["A","C"],8)
sort_merge_join([R,S],[20,20],["A","C"])
'''
R = relationship("R",["A","B"],[1,1],[40,1000])
S = relationship("S", ["C", "D"], [20, 1], [60, 1000])
R.read_from_disk()
S.read_from_disk()
'''
'''
for i in range(30):
R.add_tuple("A", random.randint(1, 40))
R.add_tuple("B", random.randint(1, 1000))
S.add_tuple("C", random.randint(20, 60))
S.add_tuple("D", random.randint(1, 1000))
R.write_to_disk()
S.write_to_disk()
R.select("A",40)
S.select("C",60)
R.project(["A"])
R.project(["A","B"])
R.properties
'''
'''
extmen = em.ExtMem()
extmen.getNewBlockInBuffer()
extmen.write_to_buffer(nima)
extmen = em.ExtMem()
extmen.getNewBlockInBuffer()
extmen.write_to_buffer(nima)
'''
Written on August 9, 2020