Skip to content

Commit 1fad8b6

Browse files
authored
Java Loggers (#939)
* Getting skeletons for Writers and writable in. Will need to think through arguments passing in Java * Logs a hashmap. Python has a generic object, we need to implement this as well * Adds TransientLogger * Finishes TransientLogger after testing and debugging. TimedRollingLogger will need to be tested after Writers are done. * Renames folder to match google style guide and linter
1 parent 7511e8b commit 1fad8b6

File tree

13 files changed

+460
-5
lines changed

13 files changed

+460
-5
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,85 @@
11
package com.whylogs.api.logger;
22

3-
public class Logger {}
3+
import com.whylogs.api.logger.resultsets.ProfileResultSet;
4+
import com.whylogs.api.logger.resultsets.ResultSet;
5+
import com.whylogs.api.writer.Writer;
6+
import com.whylogs.api.writer.WritersRegistry;
7+
import com.whylogs.core.DatasetProfile;
8+
import com.whylogs.core.schemas.DatasetSchema;
9+
import java.util.ArrayList;
10+
import java.util.HashMap;
11+
import java.util.Map;
12+
import lombok.EqualsAndHashCode;
13+
import lombok.Getter;
14+
import lombok.NoArgsConstructor;
15+
import lombok.ToString;
16+
17+
@NoArgsConstructor
18+
@Getter
19+
@EqualsAndHashCode
20+
@ToString
21+
public abstract class Logger implements AutoCloseable {
22+
private boolean isClosed = false;
23+
private DatasetSchema schema = new DatasetSchema();
24+
private ArrayList<Writer> writers = new ArrayList<>();
25+
26+
public Logger(DatasetSchema schema) {
27+
this.schema = schema;
28+
}
29+
30+
public <T extends Writer> void checkWriter(T Writer) {
31+
// Checks if a writer is configured correctly for this class
32+
// Question: why is this empty but not an abstract?
33+
}
34+
35+
public void appendWriter(String name) {
36+
if (name == null || name.isEmpty()) {
37+
throw new IllegalArgumentException("Writer name cannot be empty");
38+
}
39+
40+
Writer writer = WritersRegistry.get(name);
41+
if (writer == null) {
42+
throw new IllegalArgumentException("Writer " + name + " is not registered");
43+
}
44+
45+
appendWriter(writer);
46+
}
47+
48+
public void appendWriter(Writer writer) {
49+
if (writer == null) {
50+
throw new IllegalArgumentException("Writer cannot be null");
51+
}
52+
53+
checkWriter(writer);
54+
writers.add(writer);
55+
}
56+
57+
protected abstract ArrayList<DatasetProfile> getMatchingProfiles(Object data);
58+
59+
protected abstract <O> ArrayList<DatasetProfile> getMatchingProfiles(Map<String, O> data);
60+
61+
@Override
62+
public void close() {
63+
isClosed = true;
64+
}
65+
66+
public ResultSet log(HashMap<String, Object> data) {
67+
// What type of data is the object? Right now we don't process that in track.
68+
if (isClosed) {
69+
throw new IllegalStateException("Logger is closed");
70+
} else if (data == null) {
71+
throw new IllegalArgumentException("Data cannot be null");
72+
}
73+
74+
// TODO: implement segment processing here
75+
76+
ArrayList<DatasetProfile> profiles = getMatchingProfiles(data);
77+
for (DatasetProfile profile : profiles) {
78+
profile.track(data);
79+
}
80+
81+
// Question: Why does this only return the first profile? IS this
82+
// getting ready for multiple profiles later on?
83+
return new ProfileResultSet(profiles.get(0));
84+
}
85+
}
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,32 @@
11
package com.whylogs.api.logger;
22

3-
public class TransientLogger {}
3+
import com.whylogs.core.DatasetProfile;
4+
import com.whylogs.core.schemas.DatasetSchema;
5+
import java.util.ArrayList;
6+
import java.util.Map;
7+
import lombok.*;
8+
9+
@NoArgsConstructor
10+
@Getter
11+
@EqualsAndHashCode(callSuper = false)
12+
@ToString
13+
public class TransientLogger extends Logger {
14+
public TransientLogger(DatasetSchema schema) {
15+
super(schema);
16+
}
17+
18+
@Override
19+
protected ArrayList<DatasetProfile> getMatchingProfiles(Object data) {
20+
// In this case, we don't have any profiles to match against
21+
ArrayList<DatasetProfile> profiles = new ArrayList<>();
22+
DatasetProfile profile = new DatasetProfile(getSchema());
23+
profiles.add(profile);
24+
return profiles;
25+
}
26+
27+
@Override
28+
protected <O> ArrayList<DatasetProfile> getMatchingProfiles(Map<String, O> data) {
29+
// In this case, we don't have any profiles to match against
30+
return getMatchingProfiles((Object) data);
31+
}
32+
}

java/core/src/main/java/com/whylogs/api/logger/resultsets/ProfileResultSet.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
public class ProfileResultSet extends ResultSet {
1313
@NonNull private final DatasetProfile profile;
1414

15-
public ProfileResultSet(DatasetProfile profile) {
15+
public ProfileResultSet(@NonNull DatasetProfile profile) {
1616
super();
1717
this.profile = profile;
1818
}

java/core/src/main/java/com/whylogs/api/logger/resultsets/ResultSet.java

-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ public abstract class ResultSet {
2828
public abstract Optional<DatasetProfile> profile();
2929

3030
// TODO: Come back for ModelPerformanceMetrics
31-
3231
public void addMetric(String name, Metric<?> metric) throws Error {
3332
DatasetProfile profile =
3433
this.profile()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package com.whylogs.api.logger.rollingLogger;
2+
3+
import java.util.concurrent.Executors;
4+
import java.util.concurrent.ScheduledExecutorService;
5+
import java.util.concurrent.TimeUnit;
6+
import lombok.EqualsAndHashCode;
7+
import lombok.Getter;
8+
import lombok.ToString;
9+
10+
@Getter
11+
@EqualsAndHashCode
12+
@ToString
13+
public class Scheduler {
14+
// Multithreading schedule.
15+
// Schedule a function to be called repeatedly based on a schedule
16+
private ScheduledExecutorService scheduledService;
17+
private float initial;
18+
private boolean ranInitial = false;
19+
private float interval;
20+
private Runnable func;
21+
private boolean isRunning = false;
22+
private String[] args;
23+
// TODO: figure out args an dkwards
24+
25+
public Scheduler(float initial, float interval, Runnable func, String[] args) {
26+
this.initial = initial;
27+
this.interval = interval;
28+
this.func = func;
29+
this.args = args;
30+
this.start();
31+
}
32+
33+
private void run() {
34+
// TODO: Looking at this I think this is wrong to have lines 35 & 36
35+
this.isRunning = false;
36+
this.start(); // Question: why do we need to start again?
37+
this.func.run(); // TODO: figure out args and kwargs
38+
}
39+
40+
public void start() {
41+
if (this.isRunning) {
42+
return;
43+
}
44+
45+
float initial = 0;
46+
if (!this.ranInitial) {
47+
initial = this.getInitial();
48+
this.ranInitial = true;
49+
}
50+
51+
this.scheduledService = Executors.newSingleThreadScheduledExecutor();
52+
this.scheduledService.scheduleAtFixedRate(
53+
this::run, (long) initial, (long) this.interval, TimeUnit.SECONDS);
54+
this.isRunning = true;
55+
}
56+
57+
public void stop() {
58+
this.scheduledService.shutdown();
59+
this.isRunning = false;
60+
}
61+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
package com.whylogs.api.logger.rollingLogger;
2+
3+
import com.whylogs.api.logger.Logger;
4+
import com.whylogs.api.writer.Writer;
5+
import com.whylogs.core.DatasetProfile;
6+
import com.whylogs.core.schemas.DatasetSchema;
7+
import java.time.Instant;
8+
import java.util.ArrayList;
9+
import java.util.Map;
10+
import java.util.concurrent.Callable;
11+
12+
public class TimedRollingLogger extends Logger implements AutoCloseable {
13+
// A rolling logger that continuously rotates files based on time
14+
private DatasetSchema schema;
15+
private String baseName;
16+
private String fileExtension;
17+
private int interval;
18+
private Character when = 'H'; // TODO: Make the Literals of S M H D
19+
private boolean utc = false;
20+
private boolean align = true;
21+
private boolean skipEmpty = false;
22+
private String suffix;
23+
24+
private DatasetProfile currentProfile;
25+
private Callable<Writer> callback; // TODO: this isn't the write signature
26+
private Scheduler scheduler;
27+
private int currentBatchTimestamp;
28+
29+
// TODO: callback: Optional[Callable[[Writer, DatasetProfileView, str], None]]
30+
public TimedRollingLogger(
31+
DatasetSchema schema, String baseName, String fileExtension, int interval) {
32+
this(schema, baseName, fileExtension, interval, 'H', false, true, false);
33+
}
34+
35+
public TimedRollingLogger(
36+
DatasetSchema schema, String baseName, String fileExtension, int interval, Character when) {
37+
this(schema, baseName, fileExtension, interval, when, false, true, false);
38+
}
39+
40+
public TimedRollingLogger(
41+
DatasetSchema schema,
42+
String baseName,
43+
String fileExtension,
44+
int interval,
45+
Character when,
46+
boolean utc,
47+
boolean align,
48+
boolean skipEmpty) {
49+
super(schema);
50+
51+
this.schema = schema;
52+
this.baseName = baseName;
53+
this.fileExtension = fileExtension;
54+
this.interval = interval;
55+
this.when = Character.toUpperCase(when);
56+
this.utc = utc;
57+
this.align = align;
58+
this.skipEmpty = skipEmpty;
59+
60+
if (this.baseName == null || this.baseName.isEmpty()) {
61+
this.baseName = "profile";
62+
}
63+
if (this.fileExtension == null || this.fileExtension.isEmpty()) {
64+
this.fileExtension = ".bin"; // TODO: should we make this .whylogs?
65+
}
66+
67+
switch (this.when) {
68+
case 'S':
69+
this.interval = 1; // one second
70+
this.suffix = "%Y-%m-%d_%H-%M-%S";
71+
break;
72+
case 'M':
73+
this.interval = 60; // one minute
74+
this.suffix = "%Y-%m-%d_%H-%M";
75+
break;
76+
case 'H':
77+
this.interval = 60 * 60; // one hour
78+
this.suffix = "%Y-%m-%d_%H";
79+
break;
80+
case 'D':
81+
this.interval = 60 * 60 * 24; // one day
82+
this.suffix = "%Y-%m-%d";
83+
break;
84+
default:
85+
throw new IllegalArgumentException(
86+
"Invalid value for when: " + this.when + ". Must be S, M, H, or D");
87+
}
88+
89+
this.interval = this.interval * interval; // / multiply by units requested
90+
this.utc = utc;
91+
92+
Instant currentTime = Instant.now();
93+
this.currentBatchTimestamp = this.computeCurrentBatchTimestamp(currentTime.getEpochSecond());
94+
this.currentProfile = new DatasetProfile(schema, currentTime, currentTime);
95+
int initialRunAfter =
96+
(this.currentBatchTimestamp + this.interval) - (int) currentTime.getEpochSecond();
97+
if (initialRunAfter < 0) {
98+
// TODO: Add logging error as this shouldn't happen
99+
initialRunAfter = this.interval;
100+
}
101+
102+
this.scheduler = new Scheduler(initialRunAfter, this.interval, this::doRollover, null);
103+
this.scheduler.start();
104+
105+
// autocloseable closes at end
106+
}
107+
108+
private int computeCurrentBatchTimestamp(long nowEpoch) {
109+
int roundedNow = (int) nowEpoch; // rounds by going from an long to a int (truncates)
110+
if (this.align) {
111+
return (Math.floorDiv((roundedNow - 1), this.interval)) * this.interval + this.interval;
112+
}
113+
return roundedNow;
114+
}
115+
116+
public void checkWriter(Writer writer) {
117+
writer.check_interval(this.interval);
118+
}
119+
120+
private ArrayList<DatasetProfile> getMatchingProfiles() {
121+
ArrayList<DatasetProfile> matchingProfiles = new ArrayList<>();
122+
matchingProfiles.add(this.currentProfile);
123+
return matchingProfiles;
124+
}
125+
126+
@Override
127+
protected ArrayList<DatasetProfile> getMatchingProfiles(Object data) {
128+
return this.getMatchingProfiles();
129+
}
130+
131+
@Override
132+
protected <O> ArrayList<DatasetProfile> getMatchingProfiles(Map<String, O> data) {
133+
return this.getMatchingProfiles();
134+
}
135+
136+
private void doRollover() {
137+
if (this.isClosed()) {
138+
return;
139+
}
140+
141+
DatasetProfile oldProfile = this.currentProfile;
142+
Instant currentTime = Instant.now();
143+
this.currentBatchTimestamp = this.computeCurrentBatchTimestamp(currentTime.getEpochSecond());
144+
this.currentProfile = new DatasetProfile(schema, currentTime, currentTime);
145+
146+
this.flush(oldProfile);
147+
}
148+
149+
private void flush(DatasetProfile profile) {
150+
if (profile == null) {
151+
return;
152+
} else if (this.skipEmpty && profile.isEmpty()) {
153+
// set logger logger.debug("skip_empty is set. Skipping empty profiles")
154+
return;
155+
}
156+
157+
// get time to get name
158+
String timedFileName = this.baseName + "_" + this.currentBatchTimestamp + this.fileExtension;
159+
160+
// Sleep while the profile is active?
161+
// TODO: this is where we call the store list.write
162+
// TODO: go through through the writers
163+
}
164+
165+
public void close() {
166+
// TODO log that we are closing the writer
167+
if (!this.isClosed()) {
168+
// Autoclose handles the isCLosed()
169+
this.scheduler.stop();
170+
this.flush(this.currentProfile);
171+
}
172+
}
173+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.whylogs.api.writer;
2+
3+
import java.io.File;
4+
import java.io.FileWriter;
5+
6+
public interface Writable {
7+
8+
static FileWriter safeOpenWrite(String path) {
9+
// Open 'path' for writing, creating any parent directories as needed
10+
File file = new File(path);
11+
FileWriter writer = null;
12+
try {
13+
writer = new FileWriter(file, true);
14+
} catch (Exception e) {
15+
System.out.println("Error: " + e);
16+
e.printStackTrace();
17+
}
18+
19+
// this close happens latter on
20+
return writer;
21+
}
22+
}

0 commit comments

Comments
 (0)