-
Notifications
You must be signed in to change notification settings - Fork 0
/
cluster_spotter.py
executable file
·194 lines (164 loc) · 5.95 KB
/
cluster_spotter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
"""
Parse DX cluster telnet spots and print to STDOUT on:
* Match against pre-defined list of watched calls.
* Match against missing DXCC entities in ClubLog.
"""
import sys
import telnetlib
import time
import re
from urllib.request import urlopen
import json
from configparser import ConfigParser
import argparse
"""
Class for keeping track over the time since the last time
a specific country and band were spotted.
"""
class spot_timekeeper:
def __init__(self):
# Map over the times at which a specific (country code, band) were spotted
self.spot_times = {}
# Time threshold
threshold_seconds = 60*60
"""
Check whether the time since the last spot is large enough for reporting a
new spot for this country and band. Updates the spot time if successful.
Parameters
----------
country_code:
Country code
band:
Band
Returns
-------
exceeds_threshold: boolean
True if time difference exceeds the threshold, false otherwise
"""
def exceeds_threshold(self, country_code, band):
curr_time = time.time()
if (curr_time - self.spot_times.get((country_code, band), 0) > self.threshold_seconds):
self.spot_times[(country_code, band)] = curr_time
return True
return False;
"""
Map frequencies to ham bands between 160 and 6 m.
Parameters
----------
freq: float
Frequency in kHz
Returns
-------
band: float
Ham radio band (e.g. 80 for 80 meters)
"""
def frequency_to_band(freq):
if 1810.0<frequency<2000.0:
band = "160"
elif 3500.0<frequency<3800.0:
band = "80"
elif 5260.0<frequency<5410.0:
band = "60"
elif 7000.0<frequency<7200.0:
band = "40"
elif 10100.0<frequency<10150.0:
band = "30"
elif 14000.0<frequency<14350:
band = "20"
elif 18068.0<frequency<18168.0:
band = "17"
elif 21000.0<frequency<21450.0:
band = "15"
elif 24740.0<frequency<24990.0:
band = "12"
elif 28000.0<frequency<29700.0:
band = "10"
elif 50000.0<frequency<52000.0:
band = "6"
else:
band = None
return band
"""
Map callsign to DXCC number, locator, .. using the ClubLog API.
Parameters
----------
callsign: str
Callsign
api_key: str
ClubLog API key
"""
def query_dxcc_info(callsign, api_key):
return json.load(urlopen("https://secure.clublog.org/dxcc?call=%s&api=%s&full=1" % (callsign,api_key)))
"""
Check if the DXCC has already been run on the specified band.
Parameters
----------
dxcc: str
DXCC number
band: str
Band (e.g. 80 for 80 meters)
matrix_filename: str
Filename for JSON structure containing DXCC information obtained from ClubLog
"""
def dxcc_in_matrix(dxcc, band, matrix_filename):
try:
with open(matrix_filename) as dxcc_json_data:
dxcc_data = json.load(dxcc_json_data)
dxcc_data[dxcc][band]
return True
except KeyError:
return False
# CLI arguments
parser = argparse.ArgumentParser(description='Print watched spots and missing DXCC entities in DX cluster stream to STDOUT.')
parser.add_argument('config_file', metavar='CONFIG_FILE', type=str,
help='Path to config file (see README.md)')
parser.add_argument('-v', dest='verbose', action='store_true',
help='Verbose output (print full DX cluster output to STDERR)')
args = parser.parse_args()
# Read config file
config = ConfigParser()
config.readfp(open(args.config_file, "r"))
# JSON DXCC matrix filename
SECTION = "spotter"
dxcc_matrix_filename = config.get(SECTION, "dxcc_matrix_filename")
# Clublog API key
api_key = config.get(SECTION, "clublog_api_key")
# Obtain list of callsigns we want to spot in addition to missing calls in the DXCC matrix
watched_callsigns = config.get(SECTION, "watched_callsigns").split()
# Spam filter: keep track over the last spot
# time of specific country and band
time_since_last_report = spot_timekeeper()
# Open connection to telnet
tn = telnetlib.Telnet(config.get(SECTION, "cluster_host"), config.get(SECTION, "cluster_port"))
tn.read_until(b":")
tn.write((config.get(SECTION, "callsign") + "\n").encode('ascii'))
# Define regular expressions for obtaining callsign, frequency etc from a spot
callsign_pattern = "([a-z|0-9|/]+)"
frequency_pattern = "([0-9|.]+)"
pattern = re.compile("^DX de "+callsign_pattern+":\s+"+frequency_pattern+"\s+"+callsign_pattern+"\s+(.*)\s+(\d{4}Z)", re.IGNORECASE)
# Parse DXCC cluster stream
while (1):
# Obtain new spotted call
telnet_output = tn.read_until(b"\n")
match = pattern.match(telnet_output.decode('ascii'))
if args.verbose:
print(telnet_output.decode('ascii'), file=sys.stderr)
# If there is a match, sort matches into variables
if match:
spotter = match.group(1)
frequency = float(match.group(2))
spotted = match.group(3)
comment = match.group(4).strip()
spot_time = match.group(5)
band = frequency_to_band(frequency)
# Get DXCC information from clublog API
spotter_data = query_dxcc_info(spotter, api_key)
spotted_data = query_dxcc_info(spotted, api_key)
spotted_dxcc_route = str(spotted_data["DXCC"])
#note: spotted_data also contains coordinates of the spotted callsign which can be used for further filtering.
# Report the call if it has not been worked before
if band and spotted_dxcc_route and time_since_last_report.exceeds_threshold(spotted_dxcc_route, band) and not dxcc_in_matrix(spotted_dxcc_route, band, dxcc_matrix_filename):
print("New DXCC! {} ({}) at {} by {} ({} - {}) {}".format(spotted,spotted_data["Name"],frequency,spotter,spotter_data["Name"], comment, spot_time))
# Compare callsign against watched list of callsigns for which we are reporting spots regardless of DXCC matrix
if any(x in spotted for x in watched_callsigns):
print("{} at {} by {} ({}) {}".format(spotted,frequency,spotter,comment,spot_time))