1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package fulmine.distribution.connection.tcp;
17
18 import static fulmine.util.Utils.EMPTY_STRING;
19 import static fulmine.util.Utils.nullCheck;
20
21 import java.net.InetAddress;
22 import java.net.UnknownHostException;
23
24 import fulmine.context.IFrameworkContext;
25 import fulmine.context.INetwork;
26 import fulmine.distribution.IHeartbeatMonitor;
27 import fulmine.distribution.channel.Channel;
28 import fulmine.distribution.channel.IChannel;
29 import fulmine.distribution.connection.IConnection;
30 import fulmine.distribution.connection.IConnectionBroker;
31 import fulmine.distribution.connection.IConnectionDiscoverer;
32 import fulmine.util.log.AsyncLog;
33
34
35
36
37
38
39 public class TcpNetwork implements INetwork
40 {
41
42 private final String hostAddress;
43
44
45 private final int tcpPort;
46
47
48 private final int udpPort;
49
50
51 private final String udpNetwork;
52
53
54 private final String udpNic;
55
56
57 private IFrameworkContext context;
58
59
60 private long heartbeatPeriod = IHeartbeatMonitor.DEFAULT_HEARTBEAT_PERIOD;
61
62
63 private int allowedMissedHeartbeats =
64 IHeartbeatMonitor.DEFAULT_ALLOWED_MISSED_COUNT;
65
66
67 private boolean listeningOnlyMode;
68
69 private final static AsyncLog LOG = new AsyncLog(TcpNetwork.class);
70
71
72
73
74
75
76
77
78
79
80
81 public TcpNetwork()
82 {
83 this(System.getProperty(TcpConnectionBroker.CONTEXT_IP_ADDRESS,
84 TcpConnectionBroker.HOST_NAME),
85 Integer.parseInt(System.getProperty(
86 TcpConnectionBroker.CONTEXT_TCP_PORT, EMPTY_STRING
87 + TcpConnectionBroker.DEFAULT_CONTEXT_TCP_PORT)),
88 System.getProperty(TcpConnectionDiscoverer.UDP_DISCOVERY_NETWORK,
89 TcpConnectionDiscoverer.DEFAULT_UDP_DISCOVERY_NETWORK),
90 Integer.parseInt(System.getProperty(
91 TcpConnectionDiscoverer.UDP_DISCOVERY_PORT,
92 TcpConnectionDiscoverer.DEFAULT_UDP_DISCOVERY_PORT)),
93 System.getProperty(TcpConnectionDiscoverer.NETWORK_INTERFACE_NAME));
94 }
95
96
97
98
99
100
101
102
103
104
105
106
107
108 public TcpNetwork(String address, int port)
109 {
110 this(address, port, System.getProperty(
111 TcpConnectionDiscoverer.UDP_DISCOVERY_NETWORK,
112 TcpConnectionDiscoverer.DEFAULT_UDP_DISCOVERY_NETWORK),
113 Integer.parseInt(System.getProperty(
114 TcpConnectionDiscoverer.UDP_DISCOVERY_PORT,
115 TcpConnectionDiscoverer.DEFAULT_UDP_DISCOVERY_PORT)),
116 System.getProperty(TcpConnectionDiscoverer.NETWORK_INTERFACE_NAME));
117 }
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141 public TcpNetwork(String hostAddress, int tcpPort, String udpNetwork,
142 int udpPort, String udpNic)
143 {
144 super();
145 nullCheck(udpNetwork, "UDP network is null");
146 try
147 {
148 this.hostAddress =
149 hostAddress == null
150 ? (InetAddress.getLocalHost().getCanonicalHostName())
151 : hostAddress;
152 }
153 catch (UnknownHostException e)
154 {
155 throw new RuntimeException("Could not get host name", e);
156 }
157 this.tcpPort = tcpPort;
158 this.udpNetwork = udpNetwork;
159 this.udpPort = udpPort;
160 this.udpNic = udpNic;
161 }
162
163 public IConnectionBroker createBroker()
164 {
165 return new TcpConnectionBroker(context, this.hostAddress, this.tcpPort);
166 }
167
168 public IChannel createChannel(IConnection connection)
169 {
170 if (LOG.isDebugEnabled())
171 {
172 LOG.debug("Creating channel over " + connection);
173 }
174 return new Channel(connection, context);
175 }
176
177 public IConnectionDiscoverer createDiscoverer()
178 {
179 final TcpConnectionDiscoverer tcpConnectionDiscoverer =
180 new TcpConnectionDiscoverer(context, this.hostAddress,
181 this.tcpPort, this.udpNetwork, this.udpPort, this.udpNic);
182 tcpConnectionDiscoverer.setNetworkHeartbeatPeriod(this.heartbeatPeriod);
183 tcpConnectionDiscoverer.setAllowableNetworkHeartbeatMissCount(this.allowedMissedHeartbeats);
184 return tcpConnectionDiscoverer;
185 }
186
187 public void setContext(IFrameworkContext context)
188 {
189 this.context = context;
190 }
191
192 public void setAllowableNetworkHeartbeatMissCount(
193 int allowedHeartbeatMissCount)
194 {
195 this.allowedMissedHeartbeats = allowedHeartbeatMissCount;
196 }
197
198 public void setNetworkHeartbeatPeriod(long periodInMillis)
199 {
200 this.heartbeatPeriod = periodInMillis;
201 }
202
203 public int getAllowableNetworkHeartbeatMissCount()
204 {
205 return this.allowedMissedHeartbeats;
206 }
207
208 public long getNetworkHeartbeatPeriod()
209 {
210 return this.heartbeatPeriod;
211 }
212
213 public void setListeningOnlyMode(boolean listeningOnlyMode)
214 {
215 this.listeningOnlyMode = true;
216 }
217
218 public boolean isListeningOnlyMode()
219 {
220 return this.listeningOnlyMode;
221 }
222 }