Anti-entropy and Gossiping

题目

N个节点,使用P2P方法,分别写出使用anti-entropy和gossiping传播模型的求分布式平均数算法。

分析

anti-entropy是基于gossip的一种流行的传播模型,在模型中,节点P随机选择另一节点Q,然后交换更新信息,交换更新信息的方法有三种:push、pull和push-pull,书上说push-pull最好,因此就使用push-pull的方法。
gossiping也叫rumor spreading(流言传播),是anti-entropy的一个特殊变体。区别就是P与Q通信时如果发现Q已经被另外的节点更新了,那么P可能不再传播该信息的概率为1/k,比如k=4时遇到这种情况,下一次仍然传播的概率就变为3/4。

这是分布式系统的作业,要求用java实现,每个算法写一个java,目的就是模拟一下使用上述两种算法分布式节点求平均数,所以简单写一写就可以了,要求不高。
Anti-entropy的算法是先写的,所以还有些问题,循环用的比较粗糙~

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
//antientropy.java

import java.util.ArrayList;

//anti-entropy push&pull
public class antientropy {
//定义N个节点,Node类在gossiping中已经定义了
public static int N = 1000;

public static void main(String[] args) {
//记录节点之和
int sum=0;
//记录感染个数
int infected=0;
//声明节点数组
ArrayList<Node> list=new ArrayList<Node>();
//记录信息交换的二维数组,0表示i、j两个节点未交换信息
int[][] state = new int[N][N];

//初始化:给节点数组增加N个元素
for(int i=0;i <N;i++ ) {
//节点的值大小为0到10
list.add(i,new Node((float)(Math.random()*10),false,100));
//System.out.println("节点"+i+"="+list.get(i).getValue());
sum+=list.get(i).getValue();
for(int j=0;j<N;j++){
//设置i、j未交换信息
state[i][j]=0;
}
}

//选取传播节点P
int p=(int)(Math.random()*N);
list.get(p).setInfected(true);
infected++;
System.out.println("由"+p+"开始传播——anti-entrpy");

while(infected<N){
//每轮选取一个交换信息的节点P
p=(int)(Math.random()*N);

for(int i=0;i<list.size();i++){
int q=(int)(Math.random()*N);
if(p==q)
continue;
//若都是感染者或都不是也跳过
if((list.get(p).isInfected()&&list.get(q).isInfected()) || (!list.get(p).isInfected()&&!list.get(q).isInfected()))
continue;
//若P与J未交换且其中一者已感染则交换
if(state[p][q]==0&&state[q][p]==0){
System.out.print(p+"与"+q+"交换信息|");
//双方分别取平均值
list.get(p).setValue((list.get(p).getValue()+list.get(q).getValue())/2);
list.get(q).setValue(list.get(p).getValue());
list.get(p).setInfected(true);list.get(q).setInfected(true);//确保都变为已感染
infected++;
state[p][q]=1;//表示i、j交换过
}
System.out.println("感染率:"+(float)infected/N);
}
}
System.out.println("感染结束,平均值为:"+(float)list.get(p).getValue());
System.out.println(N+"个节点实际平均值为:"+(float)sum/N);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
//gossiping.java

import java.util.ArrayList;

//gossiping push&pull
public class gossiping {
//定义N个节点
public static int N = 10000;

public static void main(String[] args) {
//记录节点之和
int sum=0;
//声明节点数组
ArrayList<Node> list=new ArrayList<Node>();
//记录信息交换的二维数组,0表示i、j两个节点未交换信息
int[][] state = new int[N][N];
//初始化:给节点数组增加N个元素
for(int i=0;i <N;i++ ) {
//节点的值大小为0到10的整数
list.add(i,new Node((float)(Math.random()*10),false,100));
//System.out.println("节点"+i+"="+list.get(i).getValue());
sum+=list.get(i).getValue();
for(int j=0;j<N;j++){
//设置i、j未交换信息
state[i][j]=0;
}
}
//选取传播节点P
int p=(int)(Math.random()*list.size());
list.get(p).setInfected(true);
System.out.println("由"+p+"开始传播——gossiping");

while(list.get(0).getValue()!=list.get(1).getValue()){
//每轮选取一个交换信息的节点P
p=(int)(Math.random()*list.size());
//根据传染率确定是否继续传染
if(Math.random()*100>list.get(p).getRate())
continue;

for(int i=0;i<list.size();i++){
int q=(int)(Math.random()*list.size());
if(p==q)
continue;
//都是感染者则降低双方传染率然后跳过,k取4
if((state[p][q]==1||state[q][p]==1) && list.get(p).getValue()==list.get(q).getValue()){
list.get(p).setRate((int)(list.get(p).getRate()*0.75));
if(list.get(p).getRate()<25)
list.remove(p);
continue;
}
//若P与J未交换且其中一者已感染则交换
if(list.get(p).getValue()!=list.get(q).getValue()){
System.out.println(p+"与"+q+"交换信息");
//双方分别取平均值
list.get(p).setValue((list.get(p).getValue()+list.get(q).getValue())/2);
list.get(q).setValue(list.get(p).getValue());
state[p][q]=1;state[q][p]=1;//设置两节点交换过信息
list.get(p).setInfected(true);list.get(q).setInfected(true);//确保都变为已感染
}
}
}
System.out.println("感染结束,平均值为:"+(float)list.get(0).getValue());
System.out.println(N+"个节点实际平均值为:"+(float)sum/N);
}
}

class Node{
private float value;//节点的值
private boolean infected;//是否感染,true为是
private int rate;//传染率降到0时则已隔离,仅对Gossiping有用

public Node(float value, boolean infected, int rate) {
this.value=value;
this.infected=infected;
this.rate=rate;
}
public float getValue() {
return value;
}
public void setValue(float value) {
this.value = value;
}
public boolean isInfected() {
return infected;
}
public void setInfected(boolean infected) {
this.infected = infected;
}
public int getRate() {
return rate;
}
public void setRate(int rate) {
this.rate = rate;
}
}